from __future__ import division
import numpy as np
from scipy import interpolate
from ...software_modules.lockbox.input import Signal
from ...attributes import BoolProperty, FloatProperty, SelectProperty, \
FilterProperty, FrequencyProperty, IntProperty
from ...curvedb import CurveDB
from ...hardware_modules.asg import Asg0, Asg1
from ...hardware_modules.pid import Pid
from ...widgets.module_widgets import OutputSignalWidget
[docs]class AdditionalFilterAttribute(FilterProperty):
# proxy to the pid inputfilter attribute that emits a signal when changed
[docs] def valid_frequencies(self, obj):
return obj.pid.__class__.inputfilter.valid_frequencies(obj.pid)
[docs] def get_value(self, obj):
return obj.pid.inputfilter
[docs] def set_value(self, obj, value):
obj.pid.inputfilter = value
obj.lockbox._signal_launcher.update_transfer_function.emit([obj])
[docs]class OutputSignal(Signal):
"""
As many output signals as desired can be added to the lockbox. Each
output defines:
- name: the name of the output.
- dc_gain: how much the model's variable is expected to change for 1 V
on the output (in *unit*)
- unit: see above, should be one of the units available in the model.
- sweep_amplitude/offset/frequency/waveform: what properties to use when
sweeping the output
- output_channel: what physical output is used.
- p/i: the gains to use in a loop: those values are to be understood as
full loop gains (p in [1], i in [Hz])
- additional_filter: a filter (4 cut-off frequencies) to add to the loop
(in sweep and lock mode)
- extra_module: extra module to add just before the output (usually iir).
- extra_module_state: name of the state to use for the extra_module.
- tf_curve: the index of the curve describing the analog transfer
function behind the output.
- tf_filter: alternatively, the analog transfer function can be specified
by a filter (4 cut-off frequencies).
- desired_unity_gain_frequency: desired value for unity gain frequency.
- tf_type: ["flat", "curve", "filter"], how is the analog transfer
function specified.
"""
_widget_class = OutputSignalWidget
_gui_attributes = ['unit',
'sweep_amplitude',
'sweep_offset',
'sweep_frequency',
'sweep_waveform',
'dc_gain',
'output_channel',
'p',
'i',
'additional_filter',
'analog_filter_cutoff',
'extra_module',
'extra_module_state',
'desired_unity_gain_frequency',
'max_voltage',
'min_voltage']
_setup_attributes = _gui_attributes + ['assisted_design', 'tf_curve',
'tf_type']
# main attributes
dc_gain = FloatProperty(default=1.0, min=-1e10, max=1e10, call_setup=True)
output_channel = SelectProperty(options=['out1', 'out2',
'pwm0', 'pwm1'])
unit = SelectProperty(default='V/V',
options=lambda inst:
[u+"/V" for u in inst.lockbox._output_units],
call_setup=True,
ignore_errors=True)
tf_type = SelectProperty(["flat", "filter", "curve"],
default="filter",
call_setup=True)
tf_curve = IntProperty(call_setup=True)
# sweep properties
sweep_amplitude = FloatProperty(default=1., min=-1, max=1, call_setup=True)
sweep_offset = FloatProperty(default=0.0, min=-1, max=1, call_setup=True)
sweep_frequency = FrequencyProperty(default=50.0, call_setup=True)
sweep_waveform = SelectProperty(options=Asg1.waveforms, default='ramp', call_setup=True)
# gain properties
assisted_design = BoolProperty(default=True, call_setup=True)
desired_unity_gain_frequency = FrequencyProperty(default=100.0, min=0, max=1e10, call_setup=True)
analog_filter_cutoff = FrequencyProperty(default=0, min=0, max=1e10, increment=0.1, call_setup=True)
p = FloatProperty(min=-1e10, max=1e10, call_setup=True)
i = FloatProperty(min=-1e10, max=1e10, call_setup=True)
# additional filter properties
additional_filter = AdditionalFilterAttribute() #call_setup=True)
extra_module = SelectProperty(['None', 'iir', 'pid', 'iq'], call_setup=True)
extra_module_state = SelectProperty(options=['None'], call_setup=True)
# internal state of the output
current_state = SelectProperty(options=['lock', 'unlock', 'sweep'],
default='unlock')
max_voltage = FloatProperty(default=1.0, min=-1.0, max=1.0,
call_setup=True,
doc="positive saturation voltage")
min_voltage = FloatProperty(default=-1.0,
min=-1.0, max=1.0,
call_setup=True,
doc="negative saturation voltage")
[docs] def signal(self):
return self.pid.name
@property
def pid(self):
if not hasattr(self, '_pid') or self._pid is None:
self._pid = self.pyrpl.pids.pop(self.name)
self._setup_pid_output()
return self._pid
@property
def is_saturated(self):
"""
Returns
-------
True: if the output has saturated
False: otherwise
"""
ival, max, min = self.pid.ival, self.max_voltage, \
self.min_voltage
sample = getattr(self.pyrpl.rp.sampler, self.pid.name)
# criterion for saturation: integrator value saturated
# and current value (including pid) as well
if (ival > max or ival < min) and (sample > max or sample < min):
return True
else:
return False
def _setup_pid_output(self):
self.pid.max_voltage = self.max_voltage
self.pid.min_voltage = self.min_voltage
if self.output_channel.startswith('out'):
self.pid.output_direct = self.output_channel
for pwm in [self.pyrpl.rp.pwm0, self.pyrpl.rp.pwm1]:
if pwm.input == self.pid.name:
pwm.input = 'off'
elif self.output_channel.startswith('pwm'):
self.pid.output_direct = 'off'
pwm = getattr(self.pyrpl.rp, self.output_channel)
pwm.input = self.pid
else:
raise NotImplementedError(
"Selected output_channel '%s' is not implemented"
% self.output_channel)
def _clear(self):
"""
Free up resources associated with the output
"""
self.pyrpl.pids.free(self.pid)
self._pid = None
super(OutputSignal, self)._clear()
[docs] def unlock(self, reset_offset=False):
self.pid.p = 0
self.pid.i = 0
if reset_offset:
self.pid.ival = 0
self.current_state = 'unlock'
# benefit from the occasion and do proper initialization
self._setup_pid_output()
[docs] def sweep(self):
self.unlock(reset_offset=True)
self.pid.input = self.lockbox.asg
self.lockbox.asg.setup(amplitude=self.sweep_amplitude,
offset=self.sweep_offset,
frequency=self.sweep_frequency,
waveform=self.sweep_waveform,
trigger_source='immediately',
cycles_per_burst=0)
self.pid.setpoint = 0.
self.pid.p = 1.
self.current_state = 'sweep'
[docs] def lock(self, input=None, setpoint=None, offset=None, gain_factor=None):
"""
Closes the lock loop, using the required p and i parameters.
"""
# store lock parameters in case an update is requested
self._lock_input = self._lock_input if input is None else input
self._lock_setpoint = self._lock_setpoint if setpoint is None else setpoint
self._lock_gain_factor = self._lock_gain_factor if gain_factor is None else gain_factor
# Parameter 'offset' is not internally stored because another call to 'lock()'
# shouldnt reset the offset by default as this would un-lock an existing lock
#self._setup_pid_output() # optional to ensure that pid output is properly set
self._setup_pid_lock(input=self._lock_input,
setpoint=self._lock_setpoint,
offset=offset,
gain_factor=self._lock_gain_factor)
self.current_state = 'lock'
def _setup_pid_lock(self, input, setpoint, offset=None, gain_factor=1.0):
"""
If current mode is "lock", updates the gains of the underlying pid module such that:
- input.gain * pid.p * output.dc_gain = output.p
- input.gain * pid.i * output.dc_gain = output.i
"""
if isinstance(input, str): # used to be basestring
input = self.lockbox.inputs[input]
# The total loop is composed of the pid and external components.
# The external parts are 1) the output with the predefined gain and 2)
# the input (error signal) with a setpoint-dependent slope.
# 1) model the output: dc_gain converted into units of setpoint_unit_per_V
output_unit = self.unit.split('/')[0]
external_loop_gain = self.dc_gain * self.lockbox._unit_in_setpoint_unit(output_unit)
# 2) model the input: slope comes in units of V_per_setpoint_unit,
# which cancels previous unit and we end up with a dimensionless ext. gain.
external_loop_gain *= input.expected_slope(setpoint)
# we should avoid setting gains to infinity
if external_loop_gain == 0:
self._logger.warning("External loop gain for output %s is zero. "
"Skipping pid lock for this step. ",
self.name)
if offset is not None:
self.pid.ival = offset
else: # write values to pid module
# set gains to zero before switching setpoint and input,
# to avoid huge gains while transiting
self.pid.p = 0
self.pid.i = 0
self.pid.setpoint = input.expected_signal(setpoint) + input.calibration_data._analog_offset
self.pid.input = input.signal()
# set offset if applicable
if offset is not None:
self.pid.ival = offset
# set gains
self.pid.p = self.p / external_loop_gain * gain_factor
self.pid.i = self.i / external_loop_gain * gain_factor
def _setup_offset(self, offset):
self.pid.ival = offset
def _setup(self):
# synchronize assisted_design parameters with p/i setting
self._setup_ongoing = True
if self.assisted_design:
self.i = self.desired_unity_gain_frequency
if self.analog_filter_cutoff == 0:
self.p = 0
else:
self.p = self.i / self.analog_filter_cutoff
else:
self.desired_unity_gain_frequency = self.i
if self.p == 0:
self.analog_filter_cutoff = 0
else:
self.analog_filter_cutoff = self.i / self.p
self._setup_ongoing = False
# re-enable lock/sweep/unlock with new parameters
if self.current_state == 'sweep':
self.sweep()
elif self.current_state == 'unlock':
self.unlock()
elif self.current_state == 'lock':
self.lock()
# plot current transfer function
self.lockbox._signal_launcher.update_transfer_function.emit([self])
##############################
# transfer function plotting #
##############################
[docs] def tf_freqs(self):
"""
Frequency values to plot the transfer function. Frequency (abcissa) of
the tf_curve if tf_type=="curve", else: logspace(0, 6, 20000)
"""
if self.tf_type == 'curve': # req axis should be that of the curve
try:
c = CurveDB.get(self.tf_curve)
except:
self._logger.warning("Cannot load specified transfer function %s",
self.tf_curve)
else:
return c.data.index
# by default
return np.logspace(0, 6, 2000)
[docs] def transfer_function(self, freqs):
"""
Returns the design transfer function for the output
"""
analog_tf = np.ones(len(freqs), dtype=complex)
if self.tf_type == 'filter':
# use logic implemented in PID to simulate analog filters
analog_tf = Pid._filter_transfer_function(freqs, self.analog_filter_cutoff)
if self.tf_type == 'curve':
curve = CurveDB.get(self.tf_curve)
x = curve.data.index
y = curve.data.values
# sample the curve transfer function at the requested frequencies
ampl = interpolate.interp1d(x, abs(y))(freqs)
phase = interpolate.interp1d(x, np.unwrap(np.angle(y)))(freqs)
analog_tf = ampl * np.exp(1j * phase)
# multiply by PID transfer function to get the loop transfer function
# same as Pid.transfer_function(freqs) but avoids reading registers form FPGA
result = analog_tf * Pid._transfer_function(
freqs, p=self.p, i=self.i,
frequency_correction=self.pid._frequency_correction,
filter_values=self.additional_filter)
return result
# TODO: re-implement this function for if an iir filter is set
# def setup_iir(self, **kwargs):
# """
# Inserts an iir filter before the output pid. For correct routing,
# the pid input must be set correctly, as the iir filter will reuse
# the pid input setting as its own input and send its output through
# the pid.
#
# Parameters
# ----------
# kwargs: dict
# Any kwargs that are accepted by IIR.setup(). By default,
# the output's iir section in the config file is used for these
# parameters.
#
# Returns
# -------
# None
# """
# # load data from config file
# try:
# iirconfig = self._config.iir._dict
# except KeyError:
# logger.debug("No iir filter was defined for output %s. ",
# self._name)
# return
# else:
# logger.debug("Setting up IIR filter for output %s. ", self._name)
# # overwrite defaults with kwargs
# iirconfig.update(kwargs)
# if 'curve' in iirconfig:
# iirconfig.update(bodefit.iirparams_from_curve(
# id=iirconfig.pop('curve')))
# else:
# # workaround for complex numbers from yaml
# iirconfig["zeros"] = [complex(n) for n in iirconfig.pop("zeros")]
# iirconfig["poles"] = [complex(n) for n in iirconfig.pop("poles")]
# # get module
# if not hasattr(self, "iir"):
# self.iir = self._rp.iirs.pop()
# logger.debug("IIR filter retrieved for output %s. ", self._name)
# # output_direct off, since iir goes through pid
# iirconfig["output_direct"] = "off"
# # input setting -> copy the pid input if it is not erroneously on iir
# pidinput = self.pid.input
# if pidinput != 'iir':
# iirconfig["input"] = pidinput
# # setup
# self.iir.setup(**iirconfig)
# # route iir output through pid
# self.pid.input = self.iir.name
[docs]class PiezoOutput(OutputSignal):
unit = SelectProperty(default='m/V',
options=lambda inst:
[u + "/V" for u in inst.lockbox._output_units],
call_setup=True)