from __future__ import division
from collections import OrderedDict
from qtpy import QtCore
import logging
from ...modules import SignalLauncher
from ...module_attributes import ModuleListProperty
from .input import *
from .output import *
from ...widgets.module_widgets import LockboxWidget
from ...pyrpl_utils import all_subclasses
from ...async_utils import sleep
from .stage import Stage
from . import LockboxModule, LockboxModuleDictProperty
from . import LockboxLoop, LockboxPlotLoop
from ...widgets.module_widgets.lockbox_widget import LockboxSequenceWidget
[docs]def all_classnames():
return OrderedDict([(subclass.__name__, subclass) for subclass in
[Lockbox] + all_subclasses(Lockbox)])
[docs]class ClassnameProperty(SelectProperty):
"""
Lots of lockbox attributes need to be updated when model is changed
"""
[docs] def set_value(self, obj, val):
super(ClassnameProperty, self).set_value(obj, val)
# we must save the attribute immediately here in order to guarantee
# that make_Lockbox works
if obj._autosave_active:
self.save_attribute(obj, val)
else:
obj._logger.debug("Autosave of classname attribute of Lockbox is "
"inactive. This may have severe impact "
"on proper functionality.")
obj._logger.debug("Lockbox classname changed to %s", val)
# this call results in replacing the lockbox object by a new one
obj._classname_changed()
return val
[docs]class AutoLockProperty(BoolProperty):
""" true if autolock is enabled"""
[docs] def set_value(self, obj, val):
super(AutoLockProperty, self).set_value(obj=obj, val=val)
obj.relock(test_auto_lock=True)
[docs]class AutoLockIntervalProperty(FloatProperty):
""" timeout for autolock timer """
[docs] def set_value(self, obj, val):
super(AutoLockIntervalProperty, self).set_value(obj=obj, val=val)
obj._auto_lock_loop.interval = val
[docs]class LockstatusIntervalProperty(FloatProperty):
""" timeout for lockstatus timer """
[docs] def set_value(self, obj, val):
super(LockstatusIntervalProperty, self).set_value(obj=obj, val=val)
obj._lockstatus_loop.interval = val
[docs]class StateSelectProperty(SelectProperty):
[docs] def set_value(self, obj, val):
super(StateSelectProperty, self).set_value(obj, val)
# save the last time of change of state
obj._state_change_time = time()
obj._signal_launcher.state_changed.emit([val])
[docs]class SignalLauncherLockbox(SignalLauncher):
"""
A SignalLauncher for the lockbox
"""
output_created = QtCore.Signal(list)
output_deleted = QtCore.Signal(list)
output_renamed = QtCore.Signal()
stage_created = QtCore.Signal(list)
stage_deleted = QtCore.Signal(list)
stage_renamed = QtCore.Signal()
delete_widget = QtCore.Signal()
state_changed = QtCore.Signal(list)
add_input = QtCore.Signal(list)
input_calibrated = QtCore.Signal(list)
remove_input = QtCore.Signal(list)
update_transfer_function = QtCore.Signal(list)
update_lockstatus = QtCore.Signal(list)
[docs]class Lockbox(LockboxModule):
"""
A Module that allows to perform feedback on systems that are well described
by a physical model.
"""
_widget_class = LockboxWidget
_signal_launcher = SignalLauncherLockbox
_gui_attributes = ["classname",
"default_sweep_output",
"auto_lock",
"is_locked_threshold",
"setpoint_unit"]
_setup_attributes = _gui_attributes + ["auto_lock_interval",
"lockstatus_interval",
"_auto_lock_timeout"]
classname = ClassnameProperty(options=lambda: list(all_classnames().keys()))
def __init__(self, parent, name=None):
super(Lockbox, self).__init__(parent=parent, name=name)
# set state change time to negative value to indicate startup condition
self._state_change_time = -1
###################
# unit management #
###################
# setpoint_unit is mandatory to specify in which unit the setpoint is given
setpoint_unit = SelectProperty(options=['V'], default='V', ignore_errors=True)
# output gain comes in units of '_output_unit'/V of analog redpitaya output
_output_units = ['V', 'mV']
# each _output_unit must come with a function that allows conversion from
# output_unit to setpoint_unit
def _unit1_in_unit2(self, unit1, unit2, try_prefix=True):
""" helper function to convert unit2 to unit 1"""
if unit1 == unit2:
return 1.0
try:
return getattr(self, '_'+unit1+'_in_'+unit2)
except AttributeError:
try:
return 1.0 / getattr(self, '_' + unit2 + '_in_' + unit1)
except AttributeError:
if not try_prefix:
raise
# did not find the unit. Try scaling of unit1
_unit_prefixes = OrderedDict([('', 1.0,),
('m', 1e-3),
('u', 1e-6),
('n', 1e-9),
('p', 1e-12),
('k', 1e3),
('M', 1e6),
('G', 1e9),
('T', 1e12)])
for prefix2 in _unit_prefixes:
if unit2.startswith(prefix2) and len(unit2)>len(prefix2):
for prefix1 in _unit_prefixes:
if unit1.startswith(prefix1) and len(unit1)>len(prefix1):
try:
return self._unit1_in_unit2(unit1[len(prefix1):],
unit2[len(prefix2):],
try_prefix=False)\
* _unit_prefixes[prefix1]\
/ _unit_prefixes[prefix2]
except AttributeError:
pass
raise AttributeError("Could not find attribute %s in Lockbox class. "
%(unit1+'_in_'+unit2))
def _unit_in_setpoint_unit(self, unit):
# helper function to convert setpoint_unit into unit
return self._unit1_in_unit2(unit, self.setpoint_unit)
def _setpoint_unit_in_unit(self, unit):
# helper function to convert setpoint_unit into unit
return self._unit1_in_unit2(self.setpoint_unit, unit)
# default_sweep_output would throw an error if the saved state corresponds
# to a nonexisting output
default_sweep_output = SelectProperty(options=lambda lb: lb.outputs.keys(),
ignore_errors=True)
# consider cavity locked ifin units of setpoint_unit
is_locked_threshold = FloatProperty(default=1.0, min=-1e10, max=1e10,
doc="Setpoint interval size to consider "
"system in locked state")
auto_lock = AutoLockProperty(default=False, doc="Turns on the autolock "
"of the module.")
# try to relock every auto_lock_interval (s) is autolock is on
auto_lock_interval = AutoLockIntervalProperty(default=1.0, min=1e-3,
max=1e10)
_auto_lock_loop = ModuleProperty(LockboxLoop,
interval=1.0,
autostart=True,
loop_function=lambda obj: obj.relock(
test_auto_lock=True))
_auto_lock_timeout = FloatProperty(min=0,
default=1000,
doc="maximum time that a locking stage is "
"allowed to take before a lock is "
"considered as stalled. ")
# logical inputs and outputs of the lockbox are accessible as
# lockbox.outputs.output1
inputs = LockboxModuleDictProperty(input_from_output=InputFromOutput)
outputs = LockboxModuleDictProperty(output1=OutputSignal,
output2=OutputSignal)
# Sequence is a list of stage modules. By default the first stage is created
sequence = ModuleListProperty(Stage, default=[{}])
sequence._widget_class = LockboxSequenceWidget
# current state of the lockbox
current_state = StateSelectProperty(options=
(lambda inst:
['unlock', 'sweep', 'final_stage']
+ list(range(len(inst.sequence)))),
default='unlock')
@property
def final_stage(self):
""" temporary storage of the final lock stage"""
if not hasattr(self, '_final_stage'):
self._final_stage = Stage(self, name='final_stage')
self.final_stage = {}
return self._final_stage
@final_stage.setter
def final_stage(self, kwargs):
setup_attributes = self.sequence[-1].setup_attributes
setup_attributes.update(kwargs)
setup_attributes['duration'] = 0
self.final_stage.setup(**setup_attributes)
def _current_stage(self, state=None):
if state is None:
state = self.current_state
if isinstance(state, int):
return self.sequence[self.current_state]
elif state == 'final_stage':
return self.final_stage
else:
return state
@property
def current_stage(self):
return self._current_stage()
@property
def signals(self):
""" a dict of all logical signals of the lockbox """
# only return those signals that are already initialized to avoid
# recursive loops at startup
signallist = []
if hasattr(self, "_inputs"):
signallist += self.inputs.items()
if hasattr(self, "_outputs"):
signallist += self.outputs.items()
return OrderedDict(signallist)
#return OrderedDict(self.inputs.items()+self.outputs.items())
@property
def asg(self):
""" the asg being used for sweeps """
if not hasattr(self, '_asg') or self._asg is None:
self._asg = self.pyrpl.asgs.pop(self.name)
return self._asg
[docs] def calibrate_all(self, autosave=False):
"""
Calibrates successively all inputs
"""
curves = []
for input in self.inputs:
try:
c = input.calibrate(autosave=autosave)
if c is not None:
curves.append(c)
except:
pass
return curves
[docs] def get_analog_offsets(self, duration=1.0):
"""
Measures and saves the analog offset for all inputs.
This function is designed to measure the analog offsets of the redpitaya
inputs and possibly the sensors connected to these inputs. Only call this
function if you are sure about what you are doing and if all signal sources
(lasers etc.) are turned off.
The parameter duration specifies the time during which to average the
input offsets.
"""
for input in self.inputs:
input.get_analog_offset(duration=duration)
[docs] def unlock(self, reset_offset=True):
"""
Unlocks all outputs.
"""
if self._lock_loop is not None: # stop locking sequence
self._lock_loop._clear()
self._lock_loop = None
for output in self.outputs:
output.unlock(reset_offset=reset_offset)
self.current_state = 'unlock'
def _sweep(self):
"""
Performs a sweep of one of the output. No output default kwds to avoid
problems when use as a slot.
"""
self.unlock()
self.outputs[self.default_sweep_output].sweep()
self.current_state = "sweep"
[docs] def sweep(self):
"""
Performs a sweep of one of the output. No output default kwds to avoid
problems when use as a slot.
"""
return self._sweep()
_lock_loop = None # this variable will store the lock loop
[docs] def lock(self, **kwds):
"""
Launches the full lock sequence, stage by stage until the end.
optional kwds are stage attributes that are set after iteration through
the sequence, e.g. a modified setpoint.
"""
# iterate through locking sequence:
# unlock -> sequence -> final_stage
self.unlock()
# prepare final stage property as a modified copy of the last stage
self.final_stage = kwds
# actual sequence defined by a function, called in a loop
def lock_loop_function(lockbox, loop):
if loop.n < len(lockbox.sequence):
stage = lockbox.sequence[loop.n]
stage.enable()
loop.interval = stage.duration
else:
lockbox.final_stage.enable()
loop._clear()
lockbox._lock_loop = None
self._lock_loop = LockboxLoop(self, name="lock_loop",
loop_function=lock_loop_function)
[docs] def relock(self, test_auto_lock=False, **kwargs):
""" locks the cavity if it is_locked is false. Returns the value of
is_locked """
# if kwargs are given, try to set these if in final stage
if len(kwargs)> 0:
self.final_stage.setup(**kwargs)
if test_auto_lock and (not self.auto_lock or self._setup_ongoing):
# skip if autolock is off and call from autolock_timer
return
elif self.is_locking():
# lock acquisition not taking too long -> do not interrupt
return False
if self.is_locked_and_final(loglevel=0):
# locked and in final stage, nothing to do
return True
else:
# either unlocked in final stage or in an unlocked state: call lock()
self._logger.info("Attempting to re-lock...")
return self.lock(**kwargs)
[docs] def relock_until_locked(self, **kwargs):
""" blocks the command line until cavity is locked with kwargs """
def relock_function(lockbox, loop):
if lockbox.relock(**kwargs):
loop._clear()
self._relock_until_locked_loop = LockboxLoop(parent=self,
name='relock_until_locked_loop',
interval=1.0,
autostart=True,
loop_function=relock_function)
while not self._relock_until_locked_loop._ended: # wait for locks to terminate
sleep(1.0)
[docs] def lock_until_locked(self, **kwargs):
self.lock(**kwargs)
return self.relock_until_locked(**kwargs)
[docs] def sleep_while_locked(self, time_to_sleep):
t0 = time()
while time() < t0 + time_to_sleep: # doesnt quit loop during time_for_measurement
if self.is_locked_and_final(loglevel=0):
sleep(0.1)
else:
self._logger.error('Error during measurement - cavity unlocked. Aborting sleep...')
return False
return True
[docs] def is_locked(self, input=None, loglevel=logging.INFO):
""" returns True if locked, else False. Also updates an internal
dict that contains information about the current error signals. The
state of lock is logged at loglevel """
if not self.current_stage in (self.sequence+[self.final_stage]):
# not locked to any defined sequence state
self._logger.log(loglevel,
"Not locked: lockbox state '%s' does not "
"correspond to a lock stage.",
self.current_state)
return False
# test for output saturation
for o in self.outputs:
if o.is_saturated:
self._logger.log(loglevel, "Not locked: output %s "
"is saturated.", o.name)
return False
# input locked to
if input is None:
if hasattr(self, '_default_is_locked_input') and self._default_is_locked_input is not None:
input = self._default_is_locked_input
else:
input = self.current_stage.input
if not isinstance(input, InputSignal):
input = self.inputs[input]
# call is_locked of the input
try:
return input.is_locked(loglevel=loglevel)
except TypeError: # occurs if is_locked takes no argument loglevel
return input.is_locked()
[docs] def is_locked_and_final(self, loglevel=logging.INFO):
return (self.current_state == 'final_stage' and
self.is_locked(loglevel=loglevel))
[docs] def is_locking(self):
return (self._lock_loop is not None and # lock acquisition in place
self.current_stage in self.sequence and # locked at a stage
self._state_change_time + self._auto_lock_timeout > time())
def _lockstatus(self):
""" this function is a placeholder for periodic lockstatus
diagnostics, such as calls to is_locked, logging means and rms
values, and plotting measured setpoints etc."""
# ask GUI to update the lockstatus display (pass value of
# self.is_locked() instead of None if already available)
self._signal_launcher.update_lockstatus.emit([None])
# optionally, call logging functionality implemented derived classes here...
try: self.log_lockstatus()
except AttributeError: pass
_lockstatus_loop = ModuleProperty(LockboxLoop,
interval=1.0,
autostart=True,
# function is called through lambda since
loop_function=_lockstatus)
lockstatus_interval = LockstatusIntervalProperty(default=1.0, min=1e-3,
max=1e10)
@classmethod
def _make_Lockbox(cls, parent, name):
""" returns a new Lockbox object of the type defined by the classname
variable in the config file"""
# identify class name
try:
classname = parent.c[name]['classname']
except KeyError:
classname = cls.__name__
parent.logger.debug("No config file entry for classname found. "
"Using class '%s'.", classname)
parent.logger.debug("Making new Lockbox with class %s. ", classname)
# return instance of the class
return all_classnames()[classname](parent, name)
def _classname_changed(self):
# check whether a new object must be instantiated and return if not
if self.classname == type(self).__name__:
self._logger.debug("Lockbox classname not changed: - formerly: %s, "
"now: %s.",
type(self).__name__,
self.classname)
return
self._logger.debug("Lockbox classname changed - formerly: %s, now: %s.",
type(self).__name__,
self.classname)
# save names such that lockbox object can be deleted
pyrpl, name = self.pyrpl, self.name
# launch signal for widget deletion
self._signal_launcher.delete_widget.emit()
# delete former lockbox (free its resources)
self._clear()
# make a new object
new_lockbox = Lockbox._make_Lockbox(pyrpl, name)
# update references
setattr(pyrpl, name, new_lockbox) # pyrpl.lockbox = new_lockbox
pyrpl.software_modules.append(new_lockbox)
# create new dock widget
for w in pyrpl.widgets:
w.reload_dock_widget(name)
def _clear(self):
""" returns a new Lockbox object of the type defined by the classname
variable in the config file"""
pyrpl, name = self.pyrpl, self.name
if self._lock_loop is not None: # stop any lock sequence in place
self._lock_loop._clear()
super(Lockbox, self)._clear()
setattr(pyrpl, name, None) # pyrpl.lockbox = None
try:
self.parent.software_modules.remove(self)
except ValueError:
self._logger.warning("Could not find old Lockbox %s in the list of "
"software modules. Duplicate lockbox objects "
"may coexist. It is recommended to restart "
"PyRPL. Existing software modules: \n%s",
self.name, str(self.parent.software_modules))
# redirect all attributes of the old lockbox to the new/future lockbox
# object
def getattribute_forwarder(obj, attribute):
lockbox = getattr(pyrpl, name)
return getattr(lockbox, attribute)
self.__getattribute__ = getattribute_forwarder
def setattribute_forwarder(obj, attribute, value):
lockbox = getattr(pyrpl, name)
return setattr(lockbox, attribute, value)
self.__setattr__ = setattribute_forwarder
@property
def _time(self):
""" retrieves 'local' time of the lockbox """
return time()
@property
def params(self):
"""
returns a convenient dict with parameters that describe if and with
which settings the lockbox was properly.
params from different Pyrpl lockboxes can be merged together without
problems if the names of the config files differ
"""
d = dict(config=self.c._root._filename)
for var in ['is_locked', 'is_locked_and_final', 'current_state',
'_state_change_time', '_time', 'final_stage.setpoint',
'setpoint_unit', 'final_stage.gain_factor',
'final_stage.input']:
val = recursive_getattr(self, var)
if callable(val):
val = val()
d[var] = val
for o in self.inputs:
o.stats(t=1.0)
d[o.name+ '_mean'] = o.mean
d[o.name + '_rms'] = o.rms
d[o.name + '_calibration_data_min'] = o.calibration_data.min
d[o.name + '_calibration_data_max'] = o.calibration_data.max
if hasattr(o, 'quadrature_factor'):
d[o.name + '_quadrature_factor'] = o.quadrature_factor
for o in self.outputs:
d[o.name+ '_mean'] = o.mean
d[o.name + '_rms'] = o.rms
d[o.name + '_pid_setpoint'] = o.pid.setpoint
d[o.name + '_pid_p'] = o.pid.p
d[o.name + '_pid_i'] = o.pid.i
d[o.name + '_pid_ival'] = o.pid.ival
d[o.name + '_pid_input'] = o.pid.input
dd = dict()
for k in d:
dd[self.pyrpl.name+'_'+k]=d[k]
return dd