Fork PyRPL on GitHub

Source code for pyrpl.software_modules.network_analyzer

from copy import copy

import numpy as np
from qtpy import QtWidgets

from ..async_utils import PyrplFuture, MainThreadTimer, CancelledError, sleep
from ..attributes import FloatProperty, SelectProperty, FrequencyProperty, \
                         IntProperty, BoolProperty, FilterProperty, SelectProperty, \
                         ProxyProperty
from ..hardware_modules import all_inputs, all_output_directs, InputSelectProperty
from ..modules import SignalModule
from ..acquisition_module import AcquisitionModule
from ..widgets.module_widgets import NaWidget
from ..hardware_modules.iq import Iq

# timeit.default_timer() is THE precise timer to use (microsecond precise vs
# milliseconds for time.time()). see
# http://stackoverflow.com/questions/85451/python-time-clock-vs-time-time-accuracy
import timeit


[docs]class NaAcBandwidth(FilterProperty):
[docs] def valid_frequencies(self, obj): return [-freq for freq in obj.iq.inputfilter_options if freq <= 0]
[docs] def get_value(self, obj): if obj is None: return self return -obj.iq.inputfilter
[docs] def set_value(self, obj, value): if isinstance(value, list): value = value[0] obj.iq.inputfilter = -value return value
[docs]class NaAmplitudeProperty(FloatProperty):
[docs] def validate_and_normalize(self, obj, value): return obj.iq.__class__.amplitude.validate_and_normalize(obj.iq, abs(value))
[docs]class RbwAttribute(FilterProperty):
[docs] def get_value(self, instance): if instance is None: return self return instance.iq.bandwidth[0]
[docs] def set_value(self, instance, val): try: val = list(val) except: val = [val, val] # preferentially choose second order filter instance.iq.bandwidth = val return val
[docs] def valid_frequencies(self, obj): return [freq for freq in obj.iq.bandwidth_options if freq > 0]
[docs]class LogScaleProperty(BoolProperty):
[docs] def set_value(self, module, val): super(LogScaleProperty, self).set_value(module, val) module._signal_launcher.x_log_toggled.emit()
[docs]class NaPointFuture(PyrplFuture): """ Future object for a NetworkAnalyzer point. """ def __init__(self, module, point_index, min_delay_ms=0): self._module = module self.point_index = point_index self._min_delay_ms = min_delay_ms super(NaPointFuture, self).__init__() self._init_timer() def _init_timer(self): self._module._start_point_acquisition(self.point_index) if self._min_delay_ms == 0: # make sure 1st instrument interrogation occurs before time delay = self._module._remaining_time() * 1000 - 1 else: # 1 ms loss due to timer inaccuracy is acceptable delay = max(self._min_delay_ms, self._module._remaining_time() * 1000) self._timer = MainThreadTimer(max(0, delay)) self._timer.timeout.connect(self._set_data_as_result) self._timer.start() def _set_data_as_result(self): if not self.done(): # if point was cancelled, leave the loop. point = self._module._get_point(self.point_index) if point is not None: self.set_result(point) else: self._timer.setInterval(self._min_delay_ms) self._timer.start()
[docs] def set_exception(self, exception): self._timer.stop() super(NaPointFuture, self).set_exception(exception)
[docs] def cancel(self): self._timer.stop() super(NaPointFuture, self).cancel()
[docs]class NaCurveFuture(PyrplFuture): N_POINT_BENCHMARK = 100 # update measured_time_per_point every 100 points def __init__(self, module, min_delay_ms, autostart=True): self._module = module self._min_delay_ms = min_delay_ms self.current_point = 0 self.current_avg = 0 self.n_points = self._module.points self._paused = True self._fut = None # placeholder for next point future self.never_started = True super(NaCurveFuture, self).__init__() self.data_x = copy(self._module._data_x) # In case of saving latter. self.data_avg = np.empty(self.n_points, dtype=np.complex) self.data_avg.fill(np.nan) self.data_amp = np.empty(self.n_points) self.data_amp.fill(np.nan) # self.start() self._reset_benchmark() self.measured_time_per_point = np.nan # measured over last scan if autostart: self.start()
[docs] def start(self): # self._module.iq.output_direct = self._module.output_direct self._time_first_point = timeit.default_timer() self._module._start_acquisition() self._module.iq.amplitude = self._module.amplitude if self.never_started: self._module._emit_signal_by_name("clear_curve") self.never_started = False self._paused = False self._setup_next_point()
def _setup_next_point(self): self._fut = self._module._new_point_future(self.current_point, self._min_delay_ms) self._fut.add_done_callback(self._new_point_arrived)
[docs] def pause(self): #self._module.iq.output_direct = 'off' # switch off iq when paused self._module.iq.amplitude = 0 self._paused = True if self._fut is not None: self._fut.cancel()
def _reset_benchmark(self): self._last_time_benchmark = timeit.default_timer() self._current_points_benchmark = 0 def _update_benchmark(self): self._current_points_benchmark += 1 if self._current_points_benchmark >= self.N_POINT_BENCHMARK: current_time = timeit.default_timer() self.measured_time_per_point = \ (current_time - self._last_time_benchmark)/self._current_points_benchmark self._reset_benchmark() def _new_point_arrived(self, point): if self._paused: return self._update_benchmark() try: point = point.result() except CancelledError: self._point_cancelled() return # exit the loop (could be restarted latter for RunFuture) self._add_point(point) # if zero span mode, data_x is time measured, not frequency if self._module.is_zero_span(): if self.current_avg==1: time_now = timeit.default_timer() - self._time_first_point self.data_x[self.current_point] = time_now self._module._data_x[self.current_point] = time_now self.current_point+=1 if self.current_point==self.n_points: self._scan_finished() else: self._setup_next_point()
[docs] def cancel(self): self.pause() super(NaCurveFuture, self).cancel()
# These methods behave differently in the derived class RunFuture: # ---------------------------------------------------------------- # 1. points are averaged and not overwritten. # 2. Each point sends an update signal to the gui. # 3. A point cancelled doesn't mean the Future is cancelled (can be # restarted if the state is paused.) # 4. When the scan is finished, start a new scan over. def _add_point(self, point): y, amp = point self.data_avg[self.current_point] = y self.data_amp[self.current_point] = amp def _point_cancelled(self): self.cancel() # if a point is cancelled during a curve, cancel the # curve def _scan_finished(self): self.set_result(self.data_avg) self.pause()
[docs]class NaRunFuture(NaCurveFuture): def __init__(self, module, min_delay_ms): super(NaRunFuture, self).__init__(module, min_delay_ms, autostart=False) self._run_continuous = False #def pause(self): # self._paused = True def _add_point(self, point): y, amp = point index = self.current_point avg_value = self.data_avg[index] if np.isnan(avg_value): # replace nan value by 0 avg_value = 0 self.data_avg[index] = (avg_value*self.current_avg + y)/\ (self.current_avg + 1) self.data_amp[index] = amp self._module._emit_signal_by_name("update_point", index) def _point_cancelled(self): # If a point is cancelled during the run, for instance the user # pressed pause, the acquisition can be restarted latter. pass def _scan_finished(self): self.current_avg = min(self.current_avg + 1, self._module.trace_average) # launch this signal before current_point goes back to 0... self._module._emit_signal_by_name("scan_finished") if self._run_continuous or self.current_avg<self._module.trace_average: self._module._start_acquisition() # restart scan from the beginning. self.current_point = 0 self.start() if not self._run_continuous and self.current_avg == \ self._module.trace_average: self.set_result(self.data_avg) # in case the user wants to move on with running_continuous mode self.current_point = 0 self._module.running_state = "paused" def _set_run_continuous(self): self._run_continuous = True self._min_delay_ms = self._module.MIN_DELAY_CONTINUOUS_MS
[docs]class NetworkAnalyzer(AcquisitionModule, SignalModule): """ Using an IQ module, the network analyzer can measure the complex coherent response between an output and any signal in the redpitaya. Three example ways on how to use the NetworkAnalyzer: - Example 1:: r = RedPitaya("1.1.1.1") na = NetworkAnalyzer(r) curve = na.curve(start=100, stop=1000, rbw=10...) - Example 2:: na.start = 100 na.stop = 1000 curve = na.curve(rbw=10) - Example 3:: na.setup(start=100, stop=1000, ...) for freq, response, amplitude in na.values(): print response """ _widget_class = NaWidget _gui_attributes = ["input", "output_direct", "acbandwidth", "start_freq", "stop_freq", "rbw", "avg_per_point", "points", "amplitude", "logscale", "infer_open_loop_tf"] _setup_attributes = _gui_attributes + ['running_state'] trace_average = IntProperty(doc="number of curves to average in single mode. In " "continuous mode, a decaying average with a " "characteristic memory of 'trace_average' " "curves is performed.", default=10, min=1) input = InputSelectProperty(default='networkanalyzer', call_setup=True, ignore_errors=True) #input = ProxyProperty('iq.input') output_direct = SelectProperty(options=all_output_directs, default='off', call_setup=True) start_freq = FrequencyProperty(default=1e3, call_setup=True, min=Iq.frequency.increment) stop_freq = FrequencyProperty(default=1e6, call_setup=True, min=Iq.frequency.increment) rbw = RbwAttribute(default=500.0, call_setup=True) avg_per_point = IntProperty(min=1, default=1, call_setup=True) amplitude = NaAmplitudeProperty(default=0.1, min=0, max=1, call_setup=True) points = IntProperty(min=1, max=1e8, default=1001, call_setup=True) logscale = LogScaleProperty(default=True, call_setup=True) infer_open_loop_tf = BoolProperty(default=False) acbandwidth = NaAcBandwidth( default=50.0, doc="Bandwidth of the input high-pass filter of the na.", call_setup=True) def __init__(self, parent, name=None): self._setup_attributes.remove('running_state') self._setup_attributes.append('running_state') self.sleeptimes = 0.5 self._time_last_point = None self._data_x = None super(NetworkAnalyzer, self).__init__(parent, name=name) def _load_setup_attributes(self): super(NetworkAnalyzer, self)._load_setup_attributes() if self.running_state in ["running_continuous", "running_single"]: self._logger.warning("Network analyzer is currently in the " "'running' state, i.e. it is performing a " "measurement. If this is not desired, " "please call network_analyzer.stop() or " "click the corresponding GUI button!") @property def iq(self): """ underlying iq module. """ if not hasattr(self, '_iq'): self._iq = self.pyrpl.iqs.pop(owner=self.name) # initialize iq options self.iq.bandwidth = [self.__class__.rbw.default, self.__class__.rbw.default] self.iq.inputfilter = -self.__class__.acbandwidth.default return self._iq @property def output_directs(self): return self.iq.output_directs @property def inputs(self): return self.iq.inputs def _time_per_point(self): return float(self.iq._na_sleepcycles + self.iq._na_averages) \ / (125e6 * self.iq._frequency_correction)
[docs] def signal(self): return self.iq.signal()
@property def current_freq(self): """ current frequency during the scan """ return self.iq.frequency # delay observed with measurements of the na transfer function # expected is something between 3 and 4, so it is okay _delay = 3.0
[docs] def transfer_function(self, frequencies, extradelay=0): """ Returns a complex np.array containing the transfer function of the current IQ module setting for the given frequency array. The given transfer function is only relevant if the module is used as a bandpass filter, i.e. with the setting (gain != 0). If extradelay = 0, only the default delay is taken into account, i.e. the propagation delay from input to output_signal. Parameters ---------- frequencies: np.array or float Frequencies to compute the transfer function for extradelay: float External delay to add to the transfer function (in s). If zero, only the delay for internal propagation from input to output_signal is used. If the module is fed to analog inputs and outputs, an extra delay of the order of 200 ns must be passed as an argument for the correct delay modelisation. Returns ------- tf: np.array(..., dtype=np.complex) The complex open loop transfer function of the module. """ module_delay = self._delay frequencies = np.array(np.array(frequencies, dtype=np.float), dtype=np.complex) tf = np.array(frequencies*0, dtype=np.complex) + 1.0 # input filter modelisation f = self.iq.inputfilter # no for loop here because only one filter # stage if f > 0: # lowpass tf /= (1.0 + 1j * frequencies / f) module_delay += 2 # two cycles extra delay per lowpass elif f < 0: # highpass tf /= (1.0 + 1j * f / frequencies) module_delay += 1 # one cycle extra delay per highpass # add delay delay = module_delay * 8e-9 / self.iq._frequency_correction + \ extradelay tf *= np.exp(-1j * delay * frequencies * 2 * np.pi) # add delay from phase (incorrect formula or missing effect...) return tf
[docs] def threshold_hook(self, current_val): # goes in the module... """ A convenience function to stop the run upon some condition (such as reaching of a threshold. current_val is the complex amplitude of the last data point). To be overwritten in derived class... Parameters ---------- current_val Returns ------- """ pass
# Concrete implementation of AcquisitionModule methods and attributes: # -------------------------------------------------------------------- MIN_DELAY_SINGLE_MS = 0 MIN_DELAY_CONTINUOUS_MS = 0 # na should be as fast as possible _curve_future_cls = NaCurveFuture _run_future_cls = NaRunFuture def _new_run_future_obsolete(self): assert self.running_state in ["running_single", "running_continuous"], \ "Run future cannot be created in " \ "state %s"%self.running_state self._run_future.cancel() if self.running_state == "running_continuous": self._run_future = NaRunFuture(self, min_delay_ms=self.MIN_DELAY_CONTINUOUS_MS) self._run_future._set_run_continuous() if self.running_state == "running_single": self._run_future = NaRunFuture(self, min_delay_ms=self.MIN_DELAY_SINGLE_MS) def _get_new_curve_future(self, min_delay_ms): return NaCurveFuture(self, min_delay_ms) def _new_point_future(self, index, min_delay_ms): if hasattr(self, "_point_future"): self._point_future.cancel() self._point_future = NaPointFuture(self, index, min_delay_ms) return self._point_future
[docs] def is_zero_span(self): """ Returns true if start_freq is the same as stop_freq. """ return self.start_freq==self.stop_freq
def _start_point_acquisition(self, index): if not self.is_zero_span(): # in zero span, data_x are time, # not frequency self.iq.frequency = self._data_x[index] else: self.iq.frequency = self.start_freq self._time_last_point = timeit.default_timer() def _get_point(self, index): # get the actual point's (discretized) # frequency if self._remaining_time()>0: return None # only one read operation per point y = self.iq._nadata_total / self._cached_na_averages x = self._data_x[index] tf = self._tf_values[index] amp = self.amplitude # get amplitude for normalization if amp == 0: # normalize immediately y *= self._rescale # avoid division by zero else: y *= self._rescale / amp # correct for network analyzer transfer function (AC-filter and # delay) y /= tf return y, amp
[docs] def take_ringdown(self, frequency, rbw=1000, points=1000, trace_average=1): self.start_freq = frequency self.stop_freq = frequency self.rbw = rbw self.points = points self.trace_average = trace_average curve = self.single_async() sleep(0.1) self.iq.output_direct = "off" self._time_first_point=timeit.default_timer() res = curve.await_result() x = self._run_future.data_x - self._time_first_point return [x, res]
def _start_acquisition(self): """ For the NA, resuming (from pause to start for instance... should not setup the instrument again, otherwise, this would restart at the beginning of the curve) Moreover, iq is disabled even when na is just paused. :return: """ # super(NAAcquisitionManager, self)._start_acquisition() x = self._data_x if not self.is_zero_span() else \ self.start_freq*np.ones(self.points) self.iq.setup(frequency=x[0], bandwidth=self.rbw, gain=0, phase=0, acbandwidth=self.acbandwidth, amplitude=self.amplitude, input=self.input, output_direct=self.output_direct, output_signal='output_direct') # setup averaging self.iq._na_averages = np.int(np.round(125e6 / self.rbw * self.avg_per_point)) self._cached_na_averages = self.iq._na_averages self.iq._na_sleepcycles = np.int( np.round(125e6 / self.rbw * self.sleeptimes)) # time_per_point is calculated at setup for speed reasons self.time_per_point = self._time_per_point() # compute rescaling factor of raw data # 4 is artefact of fpga code self._rescale = 2.0 ** (-self.iq._LPFBITS) * 4.0 # to avoid reading it at every single point self.iq.frequency = x[0] # this triggers the NA acquisition self._time_last_point = timeit.default_timer() # pre-calculate transfer_function values for speed self._tf_values = self.transfer_function(x) self.iq.on = True # Warn the user if time_per_point is too small: # < 1 ms measurement time will make acquisition inefficient. if self.time_per_point < 0.001: self._logger.info("Time between successive points is %.1f ms." " You should increase 'avg_per_point' to at " "least %i " "for efficient acquisition.", self.time_per_point * 1000, self.avg_per_point * 0.001 / self.time_per_point) def _stop_acquisition(self): """ Stop the iq. """ self.iq.output_direct = 'off' @property def data_x(self): """ x-data for the network analyzer are computed during setup() and cached in the variable _data_x. """ return self._data_x @property def frequencies(self): """ alias for data_x :return: frequency array """ return self.data_x def _update_data_x(self): if self.is_zero_span(): self._data_x = np.zeros(self.points) # data_x will be measured during first scan... return if self.logscale: raw_values = np.logspace( np.log10(self.start_freq), np.log10(self.stop_freq), self.points, endpoint=True) else: raw_values = np.linspace(self.start_freq, self.stop_freq, self.points, endpoint=True) values = np.zeros(len(raw_values)) for index, val in enumerate(raw_values): values[index] = self.iq.__class__.frequency. \ validate_and_normalize(self, val) # retrieve the real freqs... self._data_x = values def _remaining_time(self): """Remaining time in seconds until current point is ready""" # implement here the extra waiting at the beginning if self.current_point==0: time_per_point = 3*self.time_per_point else: time_per_point = self.time_per_point return time_per_point - (timeit.default_timer() - self._time_last_point) # Shortcut to the RunFuture data (for plotting): # ---------------------------------------------- @property def last_valid_point(self): if self.current_avg>=1: return self.points - 1 else: return self.current_point @property def current_point(self): return self._run_future.current_point @property def measured_time_per_point(self): return self._run_future.measured_time_per_point def _setup(self): self._update_data_x() # precalculate frequency values super(NetworkAnalyzer, self)._setup() # overwrite default behavior to return only valid points @property def data_avg(self): return self._run_future.data_avg @property def last_valid_point(self): return self._run_future.current_point if \ self._run_future.current_avg<=1 else self.points