"""
Everything involving asynchronicity in acquisition instruments is in this file.
In particular, this includes getting curves and continuously averaging curves.
Using the coroutine syntax introduced in python 3.4+ would make the code
more elegant, but it would not be compatible with python 2.7. Hence we have
chosen to implement all asynchronous methods such that a promise is returned (a
Future-object in python). The promise implements the following methods:
- await_result(): returns the acquisition result once it is ready.
- add_done_callback(func): the function func(value) is used as "done-callback)
All asynchronous methods also have a blocking equivalent that directly
returns the result once it is ready:
- curve_async <---> curve
- single_async <---> single
Finally, this implmentation using standard python Futures makes it
possible to use transparently pyrpl asynchronous methods inside python 3.x
coroutines.
Example:
This example shows a typical acquisition use case where a sequence of
n aquisitions of simultaneous oscilloscope and network analyzer traces
are launched
::
from asyncio import ensure_future, event_loop
async def my_acquisition_routine(n):
for i in range(n):
print("acquiring scope")
fut = ensure_future(p.rp.scope.run_single())
print("acquiring na")
data2 = await p.networkanalyzer.run_single()
# both acquisitions are carried out simultaneously
data1 = await fut
print("loop %i"%i, data1, data2)
ensure_future(my_acquisition_coroutine(10))
eventloop.run_until_complete()
"""
from copy import copy
from .module_attributes import *
from .async_utils import PyrplFuture, Future, MainThreadTimer, CancelledError
[docs]class AcquisitionError(ValueError):
pass
[docs]class CurveFuture(PyrplFuture):
"""
The basic acquisition of instruments is an asynchronous process:
For instance, when the scope acquisition has been launched, we know
that the curve won't be ready before duration(), but if the scope is
waiting for a trigger event, this could take much longer. Of course,
we want the event loop to stay alive while waiting for a pending curve.
That's the purpose of this future object.
After its creation, it will perform the following actions:
1. stay inactive for a time given by instrument._remaining_time()
2. after that, it will check every min_refresh_delay if a new curve is ready with instrument._data_ready()
3. when data is ready, its result will be set with the instrument data, as returned by instrument._get_data()
"""
def __init__(self, module, min_delay_ms=20):
self._module = module
self.min_delay_ms = min_delay_ms
super(CurveFuture, self).__init__()
self._init_timer()
self._module._start_acquisition()
def _init_timer(self):
if self.min_delay_ms == 0:
# make sure 1st instrument interrogation occurs before time
delay = self._module._remaining_time() * 1000 - 1
else:
# 1 ms loss due to timer inaccuracy is acceptable
delay = max(self.min_delay_ms,
self._module._remaining_time() * 1000)
self._timer = MainThreadTimer(max(0, delay)) # avoid negative times
# delays
self._timer.timeout.connect(self._set_data_as_result)
self._timer.start()
def _get_one_curve(self):
if self._module._data_ready():
return self._module._get_curve()
else:
return None
def _set_data_as_result(self):
data = self._get_one_curve()
if data is not None:
self.set_result(data)
if self._module.running_state in ["paused", "stopped"]:
self._module._free_up_resources()
else:
self._timer.setInterval(self.min_delay_ms)
self._timer.start()
[docs] def set_exception(self, exception): # pragma: no cover
self._timer.stop()
super(CurveFuture, self).set_exception(exception)
[docs] def cancel(self):
self._timer.stop()
super(CurveFuture, self).cancel()
[docs]class RunFuture(PyrplFuture):
"""
Uses several CurveFuture to perform an average.
2 extra functions are provided to control the acquisition:
pause(): stalls the acquisition
start(): (re-)starts the acquisition (needs to be called at the beginning)
The format for curves are:
- Scope:
- data_x : self.times
- data_avg: np.array((ch1, ch2))
- Specan or NA:
- data_x : frequencies
- data_avg: np.array(y_complex)
"""
def __init__(self, module, min_delay_ms):
self._run_continuous = False
self._module = module
self._min_delay_ms = min_delay_ms
super(RunFuture, self).__init__()
self.data_avg = None
self.data_x = copy(self._module.data_x) # in case it is saved later
self._fut = None
self.current_avg = 0
self._paused = True
def _new_curve_arrived(self, curve):
try:
result = curve.result()
except (AcquisitionError, CancelledError):
if self._module.running_state in ["running_continuous",
"running_single"]:
return
else:
self.cancel()
if self._module.running_state in ["running_continuous",
"running_single"]:
self.current_avg = min(self.current_avg + 1,
self._module.trace_average)
if self.data_avg is None:
self.data_avg = result
else:
self.data_avg = (self.data_avg * (self.current_avg - 1) +
result) / self.current_avg
self._module._emit_signal_by_name('display_curve',
[self._module.data_x,
self.data_avg])
if self._is_run_over():
if not self.done():
self.set_result(self.data_avg)
self._module.running_state = "stopped" # should be 'paused'
# if we want to average over the single run, but for
# scope and specan, it is more convenient to restart
# averaging (basically saves the button stop in the GUI)
else:
if not self._paused:
self.start()
def _is_run_over(self):
if self._run_continuous:
return False
else:
return self.current_avg >= self._module.trace_average
[docs] def cancel(self):
self.pause()
super(RunFuture, self).cancel()
[docs] def pause(self):
self._paused = True
self._module._free_up_resources()
if self._fut is not None:
self._fut.cancel()
[docs] def start(self):
self._paused = False
if self._fut is not None:
self._fut.cancel()
self._fut = self._module._curve_async(self._min_delay_ms)
self._fut.add_done_callback(self._new_curve_arrived)
def _set_run_continuous(self):
"""
Makes the RunFuture continuous (used when setting "running_continuous")
"""
self._run_continuous = True
self._min_delay_ms = self._module.MIN_DELAY_CONTINUOUS_MS
[docs]class RunningStateProperty(SelectProperty):
def __init__(self, options=["running_single", "running_continuous", "paused", "stopped"], **kwargs):
"""
A property to indicate whether the instrument is currently running or not.
Changing the running_state performs the necessary actions to enable the
selected state. The state can be one of the following:
- 'running_single': takes a single acquisition (trace_average averages). Acquisitions are automatically restarted until the desired number of averages is acquired.
- 'running_continuous': continuously takes a acquisitions, eternally averages and restarts automatically.
- 'paused': acquisition interrupted, but no need to restart averaging at next call of running_continous.
- 'stopped': acquisition interrupted, averaging will restart at next call of running_continuous.
"""
super(RunningStateProperty, self).__init__(options=options, **kwargs)
# Changing running_state is handled here instead of inside _setup()
# (with a call_setup=True option) because the precise actions to be
# taken depend on the previous state of running_state. Such a behavior
# would not be straightforward to implement in _setup()
[docs] def set_value(self, obj, val):
"""
This is the master property: changing this value triggers all the logic
to change the acquisition mode
"""
# touching the running_state cancels the pending curve_future object
# (no effect if future is already done)
obj._curve_future.cancel()
previous_state = obj.running_state
SelectProperty.set_value(self, obj, val)
if val == "running_single":
# acquire as fast as possible trace_average curves
obj.setup()
elif val == "running_continuous":
if previous_state == 'stopped': # restart averaging...
obj.setup()
else:
obj._run_future._set_run_continuous() # if previous run was
# "running_single" keep averaging in the same run, simply make
# it continuous
obj._run_future.start()
elif val in ["paused", "stopped"]:
if hasattr(obj, '_run_future'):
obj._run_future.cancel() # single cannot be resumed
# on the other hand, continuous can still be started again
# eventhough it is cancelled. Basically, the result will never
# be set, but the acquisition can still be going on indefinitely.
[docs]class SignalLauncherAcquisitionModule(SignalLauncher):
""" class that takes care of emitting signals to update all possible
displays"""
display_curve = QtCore.Signal(list) # This signal is emitted when
# curves need to be displayed the argument is [array(times),
# array(curve1), array(curve2)] or [times, None, array(curve2)]
autoscale_x = QtCore.Signal()
# For now, the following signals are only implemented with NA.
update_point = QtCore.Signal(int) # used in NA only
scan_finished = QtCore.Signal() # used in NA only
clear_curve = QtCore.Signal() # NA only
x_log_toggled = QtCore.Signal() # logscale changed
# Following signal only implemented in spec an
unit_changed = QtCore.Signal()
[docs]class AcquisitionModule(Module):
"""
The asynchronous mode is supported by a sub-object "run"
of the module. When an asynchronous acquisition is running
and the widget is visible, the current averaged data are
automatically displayed. Also, the run object provides a
function save_curve to store the current averaged curve
on the hard-drive.
The full API of the "run" object is the following.
Methods:
*(All methods return immediately)*
single(): performs an asynchronous acquisition of trace_average curves.
The function returns a promise of the result:
an object with a ready() function, and a get() function that
blocks until data is ready.
continuous(): continuously acquires curves, and performs a
moving average over the trace_average last ones.
pause(): stops the current acquisition without restarting the
averaging
stop(): stops the current acquisition and restarts the averaging.
save_curve(): saves the currently averaged curve (or curves for scope)
curve(): the currently averaged curve
Attributes:
curve_name (str): name of the curve to create upon saving
trace_average (int): number of averages in single (not to confuse with
averaging per point)
data_avg (array of numbers): array containing the current averaged curve
current_avg (int): current number of averages
"""
# The averaged data are stored in a RunFuture object _run_future
#
# _setup() recreates from scratch _run_future by calling _new_run_future()
#
# It is necessary to setup the AcquisitionModule on startup to start
# with clean arrays
#
# Changing any attribute in callback_attribute (mostly every
# setup_attribute except running_state) will force a restart of the
# averaging by calling setup
#
# On the other hand, "running_state" has a customized behavior: it will
# only call setup() when needed and perform customized actions otherwise:
# - paused/stopped -> running_single: start acquisition on new future
# - paused -> running_continuous: start acquisition on same future + set
# future to run_continuous (irreversible)
# - stopped -> running_continuous: start acquisition on new future +
# set future to run_continuous (irreversible) == call setup()
# - running_single/running_continuous -> pause/stop: pause acquisition
_gui_attributes = ['trace_average', 'curve_name']
_setup_on_load = True # acquisition_modules need to be setup() once
# they are loaded
_signal_launcher = SignalLauncherAcquisitionModule
_setup_attributes = ['running_state', 'trace_average', 'curve_name']
_run_future_cls = RunFuture
_curve_future_cls = CurveFuture
MIN_DELAY_SINGLE_MS = 0 # async acquisition should be as fast as
# possible
MIN_DELAY_CONTINUOUS_MS = 40 # leave time for the event loop in
# continuous
running_state = RunningStateProperty(
default='stopped',
doc="Indicates whether the instrument is running acquisitions or not. "
"See :class:`RunningStateProperty` for available options. ")
trace_average = IntProperty(doc="number of curves to average in single mode. In "
"continuous mode, a moving window average is "
"performed.",
default=1,
min=1)
curve_name = StringProperty(doc="name of the curve to save.")
def __init__(self, parent, name=None):
# The curve promise is initialized with a dummy Future, because
# instantiating CurveFuture launches a curve acquisition
self._curve_future = Future()
super(AcquisitionModule, self).__init__(parent, name=name)
self.curve_name = self.name + " curve"
self._run_future = self._run_future_cls(self,
min_delay_ms=self.MIN_DELAY_SINGLE_MS)
# On the other hand, RunFuture has a start method and is not started
# at instanciation.
def _new_curve_future(self, min_delay_ms):
self._curve_future.cancel()
self._curve_future = self._curve_future_cls(self,
min_delay_ms=min_delay_ms)
def _new_run_future(self):
if hasattr(self, "_run_future"):
self._run_future.cancel()
if self.running_state == "running_continuous":
self._run_future = self._run_future_cls(self,
min_delay_ms=self.MIN_DELAY_CONTINUOUS_MS)
self._run_future._set_run_continuous()
else:
self._run_future = self._run_future_cls(self,
min_delay_ms=self.MIN_DELAY_SINGLE_MS)
def _emit_signal_by_name(self, signal_name, *args, **kwds):
"""Let's the module's signal_launcher emit signal name"""
self._signal_launcher.emit_signal_by_name(signal_name, *args, **kwds)
def _curve_async(self, min_delay_ms):
"""
Same as curve_async except this function can be used in any
running_state.
"""
self._start_acquisition()
self._new_curve_future(min_delay_ms=min_delay_ms)
return self._curve_future
[docs] def curve_async(self):
"""
Launches the acquisition for one curve with the current parameters.
- If running_state is not "stopped", stops the current acquisition.
- If rolling_mode is True, raises an exception.
- Immediately returns a future object representing the curve.
- The curve can be retrieved by calling result(timeout) on the future object.
- The future is cancelled if the instrument's state is changed before the end of the acquisition, or another call to curve_async() or curve() is made on the same instrument.
"""
if self.running_state is not "stopped":
self.stop()
return self._curve_async(0)
[docs] def curve(self, timeout=None):
"""
Same as curve_async, except:
- the function will not return until the curve is ready or timeout occurs.
- the function directly returns an array with the curve instead of a future object
"""
return self.curve_async().await_result(timeout)
[docs] def single_async(self):
"""
Performs an asynchronous acquisition of trace_average curves.
- If running_state is not stop, stops the current acquisition.
- Immediately returns a future object representing the curve.
- The curve can be retrieved by calling result(timeout) on the future object.
- The future is cancelled if the instrument's state is changed before the end of the acquisition.
"""
self.running_state = 'running_single'
return self._run_future
[docs] def single(self, timeout=None):
"""
Same as single_async, except:
- the function will not return until the averaged curve is ready or timeout occurs.
- the function directly returns an array with the curve instead of a future object.
"""
return self.single_async().await_result(timeout)
[docs] def continuous(self):
"""
continuously acquires curves, and performs a moving
average over the trace_average last ones.
"""
self.running_state = 'running_continuous'
# return self._continuous_future
[docs] def pause(self):
"""
Stops the current acquisition without restarting the averaging
"""
self.running_state = 'paused'
[docs] def stop(self):
"""
Stops the current acquisition and averaging will be restarted
at next run.
"""
self.running_state = 'stopped'
[docs] def save_curve(self):
"""
Saves the curve(s) that is (are) currently displayed in the gui in
the db_system. Also, returns the list [curve_ch1, curve_ch2]...
"""
params = self.setup_attributes
params.update(name=self.curve_name)
curve = self._save_curve(self._run_future.data_x,
self._run_future.data_avg,
**params)
return curve
def _clear(self):
super(AcquisitionModule, self)._clear()
self._curve_future.cancel()
self._run_future.cancel()
def _setup(self):
# the _run_future is renewed to match the requested type of run (
# rolling_mode or triggered)
# This is how we make sure changing duration or rolling_mode won't
# freeze the acquisition.
self._new_run_future()
if self.running_state in ["running_single", "running_continuous"]:
self._run_future.start()
self._emit_signal_by_name("autoscale_x")
# Methods to implement in derived class:
# --------------------------------------
def _remaining_time(self):
"""
remaining time (in seconds) until the data has a chance to be ready.
In the case of scope, where trigger might delay the acquisition,
this is the minimum time to wait in the "best case scenario" where
the acquisition would have started immediately after setup().
"""
raise NotImplementedError("To implement in derived class") # pragma: no cover
def _data_ready(self):
"""
:return: True or False
"""
raise NotImplementedError('To implement in derived class') # pragma: no cover
def _get_curve(self):
"""
get the curve from the instrument.
a 1D array for single channel instruments
a 2*n array for the scope
"""
raise NotImplementedError # pragma: no cover
@property
def data_x(self):
"""
x-axis of the curves to plot.
:return:
"""
raise NotImplementedError("To implement in derived class") # pragma: no cover
def _start_acquisition(self):
"""
If anything has to be communicated to the hardware (such as make
trigger ready...) to start the acquisition, it should be done here.
This function will be called only be called by the init-function of
the _curve_future()
Only non-blocking operations are allowed.
"""
pass # pragma: no cover
def _free_up_resources(self):
pass # pragma: no cover
# Shortcut to the RunFuture data (for plotting):
# ----------------------------------------------
@property
def data_avg(self):
return self._run_future.data_avg
@property
def current_avg(self):
return self._run_future.current_avg