Source code for pyrpl.widgets.module_widgets.na_widget

The network analyzer records the coherent response of the signal at the port
:code:`input` to a sinusoidal excitation of variable frequency sent to the
output selected in :code:`output_direct`.

.. note:: If :code:`output_direct='off'`, another module's input can be set
          to :code:`networkanalyzer` to test its response to a frequency sweep.

* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.amplitude`
  sets the amplitude of the sinusoidal excitation in Volts.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.start_freq`/:attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.stop_freq`
  define the frequency range over which a transfer function is recorded.
  Swapping the values of :code:`start_freq` and :code:`stop_freq` reverses the
  direction of the frequency sweep. Setting :code:`stop_freq = start_freq`
  enables the "zero-span" mode, where the coherent response at a constant
  frequency is recorded as a function of time.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.points`
  defines the number of frequency points in the recorded transfer function.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.rbw` is
  the cutoff frequency of the low-pass filter after demodulation. Furthermore,
  the time :math:`\\tau` spent to record each point is
  :math:`\\tau=\\texttt{avg_per_point} / \\texttt{rbw}`.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.avg_per_point`:
  Each point is averaged inside the FPGA before being retrieved by the
  client computer that runs PyRPL. You should increase this parameter or
  decrease :code:`rbw` if the communication time between the Red Pitaya and
  the client computer limits the acquisition speed.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.acbandwidth`
  is the cutoff frequency of a high-pass filter applied to the input before
  demodulation. A setting of zero disables the high-pass filter.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.logscale`
  enables the use of a logarithmic scale for the frequency axis, resulting in
  a logarithmic distribution of the frequency points as well.
* :attr:`~pyrpl.software_modules.network_analyzer.NetworkAnalyzer.infer_open_loop_tf`
  applies the transformation :math:`T \\rightarrow \\frac{T}{1+T}` to the displayed
  transfer function to correct for the effect of a closed feedback loop
  (not implemented at the moment).

from .base_module_widget import ModuleWidget
from.acquisition_module_widget import AcquisitionModuleWidget

from qtpy import QtCore, QtWidgets
import pyqtgraph as pg
from time import time
import numpy as np
import sys

[docs]class NaWidget(AcquisitionModuleWidget): """ Network Analyzer Tab. """ starting_update_rate = 0.2 # this would be a good idea to change this number dynamically when the curve becomes CHUNK_SIZE = 500 # more and more expensive to display.
[docs] def init_gui(self): """ Sets up the gui """ #self.main_layout = QtWidgets.QVBoxLayout() self.init_main_layout(orientation="vertical") self.init_attribute_layout() self.button_layout = QtWidgets.QHBoxLayout() #self.setLayout(self.main_layout) self.setWindowTitle("NA") = pg.GraphicsWindow(title="Magnitude") self.label_benchmark = pg.LabelItem(justify='right'), row=1,col=0) self._last_benchmark_value = np.nan self.win_phase = pg.GraphicsWindow(title="Phase") self.plot_item =, col=0, title="Magnitude (dB)") self.plot_item_phase = self.win_phase.addPlot(row=1, col=0, title="Phase (deg)") self.plot_item_phase.setXLink(self.plot_item) self.button_single = QtWidgets.QPushButton("Run single") self.button_single.my_label = "Single" self.button_continuous = QtWidgets.QPushButton("Run continuous") self.button_continuous.my_label = "Continuous" self.button_stop = QtWidgets.QPushButton('Stop') self.button_save = QtWidgets.QPushButton("Save curve") self.chunks = [] #self.plot_item.plot(pen='y') self.chunks_phase = [] self.main_layout.addWidget( self.main_layout.addWidget(self.win_phase) aws = self.attribute_widgets self.attribute_layout.removeWidget(aws["trace_average"]) self.attribute_layout.removeWidget(aws["curve_name"]) #self.button_layout.addWidget(aws["trace_average"]) #self.button_layout.addWidget(aws["curve_name"]) super(NaWidget, self).init_gui() #self.button_layout.addWidget(self.button_single) #self.button_layout.addWidget(self.button_continuous) #self.button_layout.addWidget(self.button_stop) #self.button_layout.addWidget(self.button_save) #self.main_layout.addLayout(self.button_layout) #self.button_single.clicked.connect(self.run_single_clicked) #self.button_continuous.clicked.connect(self.run_continuous_clicked) #self.button_stop.clicked.connect(self.button_stop_clicked) #self.button_save.clicked.connect(self.save_clicked) self.arrow = pg.ArrowItem() self.arrow.setVisible(False) self.arrow_phase = pg.ArrowItem() self.arrow_phase.setVisible(False) self.plot_item.addItem(self.arrow) self.plot_item_phase.addItem(self.arrow_phase) self.last_updated_point = 0 self.last_updated_time = 0 #self.display_state(self.module.running_state) self.update_running_buttons() self.update_period = self.starting_update_rate # also modified in clear_curve. # Not sure why the stretch factors in button_layout are not good by # default... #self.button_layout.setStretchFactor(self.button_single, 1) #self.button_layout.setStretchFactor(self.button_continuous, 1) #self.button_layout.setStretchFactor(self.button_stop, 1) #self.button_layout.setStretchFactor(self.button_save, 1) self.x_log_toggled() # Set the axis in logscale if it has to be
[docs] def autoscale(self): """ log_mode = self.module.logscale self.plot_item.setLogMode(x=log_mod, y=None) # this seems also needed self.plot_item_phase.setLogMode(x=log_mod, y=None) """ self.plot_item.setRange(xRange=[self.module.start_freq, self.module.stop_freq]) self.plot_item_phase.setRange(xRange=[self.module.start_freq, self.module.stop_freq])
[docs] def clear_curve(self): """ Clear all chunks """ self.update_period = self.starting_update_rate # let's assume update of curve takes 50 ms while(True): try: chunk = self.chunks.pop() chunk_phase = self.chunks_phase.pop() chunk.clear() chunk_phase.clear() except IndexError: break self.label_benchmark.setText("")
[docs] def x_log_toggled(self): """ change x_log of axis """ log_mod = self.module.logscale self.plot_item.setLogMode(x=log_mod, y=None) # this seems also needed self.plot_item_phase.setLogMode(x=log_mod, y=None) for chunk, chunk_phase in zip(self.chunks, self.chunks_phase): chunk.setLogMode(xMode=log_mod, yMode=None) chunk_phase.setLogMode(xMode=log_mod, yMode=None)
[docs] def scan_finished(self): """ if in run continuous, needs to redisplay the number of averages """ self.update_current_average() self.update_point(self.module.points-1, force=True) # make sure all points in the scan are updated
[docs] def set_benchmark_text(self, text): self.label_benchmark.setText(text)
[docs] def update_point(self, index, force=False): """ To speed things up, the curves are plotted by chunks of self.CHUNK_SIZE points. All points between last_updated_point and index will be redrawn. """ # APP.processEvents() # Give hand back to the gui since timer intervals might be very short last_chunk_index = self.last_updated_point//self.CHUNK_SIZE current_chunk_index = index//self.CHUNK_SIZE rate = self.module.measured_time_per_point if not np.isnan(rate) and self._last_benchmark_value != rate: theory = self.module.time_per_point self.set_benchmark_text("ms/pt: %.1f (theory: %.1f)"%( rate*1000, theory*1000)) if force or (time() - self.last_updated_time > self.update_period): # if last update time was a long time ago, # update plot, otherwise we would spend more time plotting things than acquiring data... for chunk_index in range(last_chunk_index, current_chunk_index+1): self.update_chunk(chunk_index) # eventually several chunks to redraw self.last_updated_point = index self.last_updated_time = time() # draw arrow cur = self.module.current_point - 1 visible = self.module.last_valid_point != cur + 1 logscale = self.module.logscale freq = self.module.data_x[cur] xpos = np.log10(freq) if logscale else freq if cur > 0: self.arrow.setPos(xpos, self._magnitude(self.module.data_avg[ cur])) self.arrow.setVisible(visible) self.arrow_phase.setPos(xpos, self._phase( self.module.data_avg[cur])) self.arrow_phase.setVisible(visible)
def _magnitude(self, data): return 20. * np.log10(np.abs(data)+sys.float_info.epsilon) def _phase(self, data): return np.angle(data, deg=True)
[docs] def update_attribute_by_name(self, name, new_value_list): super(NaWidget, self).update_attribute_by_name(name, new_value_list) if name == "running_state": #self.display_state(self.module.running_state) self.update_running_buttons()
[docs] def update_chunk(self, chunk_index): """ updates curve # chunk_index with the data from the module """ while len(self.chunks) <= chunk_index: # create as many chunks as needed to reach chunk_index (in principle only # one curve should be missing at most) chunk = self.plot_item.plot(pen='y') chunk_phase = self.plot_item_phase.plot(pen=None, symbol='o') self.chunks.append(chunk) self.chunks_phase.append(chunk_phase) log_mod = self.module.logscale chunk.setLogMode(xMode=log_mod, yMode=None) chunk_phase.setLogMode(xMode=log_mod, yMode=None) sl = slice(max(0, self.CHUNK_SIZE * chunk_index - 1), min(self.CHUNK_SIZE * (chunk_index + 1), self.module.last_valid_point), 1) # make sure there is an overlap between slices data = self.module.data_avg[sl] x = np.real(self.module.data_x[sl]) self.chunks[chunk_index].setData(x, self._magnitude(data)) self.chunks_phase[chunk_index].setData(x, self._phase(data))
#def run_continuous_clicked(self): # """ # launches a continuous run # """ # if str(self.button_continuous.text()).startswith("Pause"): # self.module.pause() # else: # self.module.continuous() #def run_single_clicked(self): # """ # launches a single acquisition # """ # if str(self.button_single.text()).startswith("Pause"): # self.module.pause() # else: # self.module.single_async() #def save_clicked(self): # """ # Save the current curve. # """ # self.module.save_curve()
[docs] def display_state(self, running_state): """ Displays one of the possible states "running_continuous", "running_single", "paused_continuous", "paused_single", "stopped" """ if not running_state in ["running_continuous", "running_single", "paused", "stopped"]: raise ValueError("Na running_state should be either " "running_continuous, " "running_single, " "paused or " "stopped") if running_state=="running_continuous": self.button_single.setEnabled(False) self.button_single.setText("Run single") self.button_continuous.setEnabled(True) self.button_continuous.setText("Pause") return if running_state== "running_single": self.button_single.setEnabled(True) self.button_single.setText("Pause") self.button_continuous.setEnabled(False) self.button_continuous.setText("Run continuous") return if running_state == "paused": self.button_continuous.setText("Resume continuous") self.button_single.setText("Run single") self.button_continuous.setEnabled(True) self.button_single.setEnabled(False) return if running_state == "stopped": self.button_continuous.setText("Run continuous") self.button_single.setText("Run single") self.button_continuous.setEnabled(True) self.button_single.setEnabled(True) return
#def button_stop_clicked(self): # """ # Going to stop will impose a setup_average before next run. # """ # self.module.stop() class MyGraphicsWindow(pg.GraphicsWindow): def __init__(self, title, parent_widget): super(MyGraphicsWindow, self).__init__(title) self.parent_widget = parent_widget self.setToolTip("IIR transfer function: \n" "----------------------\n" "CTRL + Left click: add one more pole. \n" "SHIFT + Left click: add one more zero\n" "Left Click: select pole (other possibility: click on the '+j' labels below the graph)\n" "Left/Right arrows: change imaginary part (frequency) of the current pole or zero\n" "Up/Down arrows; change the real part (width) of the current pole or zero. \n" "Poles are represented by 'X', zeros by 'O'") def mousePressEvent(self, *args, **kwds): event = args[0] try: modifier = int(event.modifiers()) it = self.getItem(0, 0) pos = it.mapToScene(event.pos()) # + it.vb.pos() point = it.vb.mapSceneToView(pos) x, y = point.x(), point.y() x = 10 ** x new_z = -100 - 1.j * x if modifier==QtCore.Qt.CTRL: self.parent_widget.module.poles += [new_z] self.parent_widget.attribute_widgets['poles'].set_selected(-1) if modifier == QtCore.Qt.SHIFT: self.parent_widget.module.zeros += [new_z] self.parent_widget.attribute_widgets['zeros'].set_selected(-1) except BaseException as e: self.parent_widget.module._logger.error(e) finally: return super(MyGraphicsWindow, self).mousePressEvent(*args, **kwds)