Fork PyRPL on GitHub

Source code for pyrpl.attributes

"""
The parameters of any module are controlled by descriptors deriving from
:obj:`BaseAttribute`.

An attribute is a field that can be set or get by several means:

- programmatically: module.attribute = value
- graphically: attribute.create_widget(module) returns a widget to
  manipulate the value
- via loading the value in a config file for permanent value preservation

Of course, the gui/parameter file/actual values have to stay "in sync" each
time the attribute value is changed. The necessary mechanisms are happening
behind the scene, and they are coded in this file.
"""

from __future__ import division
from functools import partial
from .pyrpl_utils import recursive_getattr, recursive_setattr
from .widgets.attribute_widgets import BoolAttributeWidget, \
                                       FloatAttributeWidget, \
                                       FilterAttributeWidget, \
                                       IntAttributeWidget, \
                                       SelectAttributeWidget, \
                                       StringAttributeWidget, \
                                       BoolIgnoreAttributeWidget, \
                                       TextAttributeWidget, \
                                       CurveAttributeWidget, \
                                       DataAttributeWidget, \
                                       CurveSelectAttributeWidget, \
                                       LedAttributeWidget, \
                                       PlotAttributeWidget, \
                                       BasePropertyListPropertyWidget, \
                                       ComplexAttributeWidget

from .curvedb import CurveDB
from collections import OrderedDict
import logging
import sys
import numpy as np
import numbers

logger = logging.getLogger(name=__name__)

#way to represent the smallest positive value
#needed to set floats to minimum count above zero
epsilon = sys.float_info.epsilon


[docs]class BaseAttribute(object): """base class for attribute - only used as a placeholder"""
[docs]class BaseProperty(BaseAttribute): """ A Property is a special type of attribute that is not mapping a fpga value, but rather an attribute _name of the module. This is used mainly in SoftwareModules An attribute is a field that can be set or get by several means: * programmatically: module.attribute = value * graphically: attribute.create_widget(module) returns a widget to manipulate the value * via loading the value in a config file for permanence The concrete derived class need to have certain attributes properly defined: * widget_class: the class of the widget to use for the gui (see attribute_widgets.py) * a function set_value(instance, value) that effectively sets the value (on redpitaya or elsewhere) * a function get_value(instance) that reads the value from wherever it is stored internally """ _widget_class = None widget = None default = None def __init__(self, default=None, doc="", ignore_errors=False, call_setup=False): """ default: if provided, the value is initialized to it """ if default is not None: self.default = default self.call_setup = call_setup self.ignore_errors = ignore_errors self.__doc__ = doc def __set__(self, obj, value): """ This function is called for any BaseAttribute, such that all the gui updating, and saving to disk is done automatically. The real work is delegated to self.set_value. """ value = self.validate_and_normalize(obj, value) self.set_value(obj, value) # save new value in config, lauch signal and possibly call setup() self.value_updated(obj, value)#self.get_value(obj))
[docs] def validate_and_normalize(self, obj, value): """ This function should raise an exception if the value is incorrect. Normalization can be: - returning value.name if attribute "name" exists - rounding to nearest multiple of step for float_registers - rounding elements to nearest valid_frequencies for FilterAttributes """ return value # by default any value is valid
[docs] def value_updated(self, module, value=None, appendix=[]): """ Once the value has been changed internally, this function is called to perform the following actions: - launch the signal module._signal_launcher.attribute_changed (this is used in particular for gui update) - saves the new value in the config file (if flag module._autosave_active is True). - calls the callback function if the attribute is in module.callback Note for developers: We might consider moving the 2 last points in a connection behind the signal "attribute_changed". """ if value is None: value = self.get_value(module) self.launch_signal(module, value, appendix=appendix) if module._autosave_active: # (for module, when module is slaved, don't save attributes) if self.name in module._setup_attributes: self.save_attribute(module, value) if self.call_setup and not module._setup_ongoing: # call setup unless a bunch of attributes are being changed together. module._logger.info('Calling setup() for %s.%s ...', module.name, self.name) module.setup() return value
def __get__(self, instance, owner): # self.parent = instance #store instance in memory <-- very bad practice: there is one Register for the class # and potentially many obj instances (think of having 2 redpitayas in the same python session), then # _read should use different clients depending on which obj is calling...) if instance is None: return self return self.get_value(instance)
[docs] def launch_signal(self, module, new_value, appendix=[]): """ Updates the widget and other subscribers with the module's value. """ try: module._signal_launcher.update_attribute_by_name.emit( self.name, [new_value]+appendix) except AttributeError as e: # occurs if nothing is connected (TODO: # remove this) module._logger.error("Erro in launch_signal of %s: %s", module.name, e)
[docs] def save_attribute(self, module, value): """ Saves the module's value in the config file. """ module.c[self.name] = value
def _create_widget(self, module, widget_name=None): """ Creates a widget to graphically manipulate the attribute. """ if self._widget_class is None: logger.warning("Module %s of type %s is trying to create a widget " "for %s, but no _widget_class is defined!", str(module), type(module), self.name) return None widget = self._widget_class(module, self.name, widget_name=widget_name) return widget
[docs] def get_value(self, obj): if not hasattr(obj, '_' + self.name): setattr(obj, '_' + self.name, self.default) return getattr(obj, '_' + self.name)
[docs] def set_value(self, obj, val): setattr(obj, '_' + self.name, val)
[docs]class BaseRegister(BaseProperty): """Registers implement the necessary read/write logic for storing an attribute on the redpitaya. Interface for basic register of type int. To convert the value between register format and python readable format, registers need to implement "from_python" and "to_python" functions""" default = None def __init__(self, address, bitmask=None, **kwargs): self.address = address self.bitmask = bitmask BaseProperty.__init__(self, **kwargs) def _writes(self, obj, addr, v): return obj._writes(addr, v) def _reads(self, obj, addr, l): return obj._reads(addr, l) def _write(self, obj, addr, v): return obj._write(addr, v) def _read(self, obj, addr): return obj._read(addr)
[docs] def get_value(self, obj): """ Retrieves the value that is physically on the redpitaya device. """ # self.parent = obj # store obj in memory if self.bitmask is None: return self.to_python(obj, obj._read(self.address)) else: return self.to_python(obj, obj._read(self.address) & self.bitmask)
[docs] def set_value(self, obj, val): """ Sets the value on the redpitaya device. """ if self.bitmask is None: obj._write(self.address, self.from_python(obj, val)) else: act = obj._read(self.address) new = act & (~self.bitmask) | (int(self.from_python(obj, val)) & self.bitmask) obj._write(self.address, new)
def __set__(self, obj, value): """ this is very similar to the __set__ function of the parent, but here, value_updated is called with the return from validate_and_normalize instead of with the new from get_value in order to save one read operation. """ value = self.validate_and_normalize(obj, value) self.set_value(obj, value) # save new value in config, lauch signal and possibly call setup() self.value_updated(obj, value)
[docs]class BoolProperty(BaseProperty): """ A property for a boolean value """ _widget_class = BoolAttributeWidget default = False
[docs] def validate_and_normalize(self, obj, value): """ Converts value to bool. """ return bool(value)
[docs]class LedProperty(BoolProperty): _widget_class = LedAttributeWidget def __init__(self, true_function = None, false_function = None, **kwargs): """ default: if provided, the value is initialized to it """ self.true_function = true_function or self.true_function self.false_function = false_function or self.false_function super(LedProperty, self).__init__(**kwargs)
[docs] def set_value(self, obj, val): try: if val: self.true_function(obj) else: self.false_function(obj) except TypeError as e: obj._logger.debug('Cannot call %s of %s.%s: %s', 'true_function' if val else 'false_function', obj.name, self.name, e) else: super(LedProperty, self).set_value(obj, val)
[docs]class BoolRegister(BaseRegister, BoolProperty): """Inteface for boolean values, 1: True, 0: False. invert=True inverts the mapping""" def __init__(self, address, bit=0, bitmask=None, invert=False, **kwargs): self.bit = bit assert type(invert) == bool self.invert = invert BaseRegister.__init__(self, address=address, bitmask=bitmask) BoolProperty.__init__(self, **kwargs)
[docs] def to_python(self, obj, value): value = bool((value >> self.bit) & 1) if self.invert: value = not value return value
[docs] def from_python(self, obj, val): if self.invert: val = not val if val: towrite = obj._read(self.address) | (1 << self.bit) else: towrite = obj._read(self.address) & (~(1 << self.bit)) return towrite
[docs]class BoolIgnoreProperty(BoolProperty): """ An attribute for booleans """ _widget_class = BoolIgnoreAttributeWidget default = False
[docs] def validate_and_normalize(self, obj, value): """ Converts value to bool. """ if isinstance(value, str): # used to be basestring if value.lower() == 'true': return True elif value.lower() == 'false': return False else: return 'ignore' else: return bool(value)
[docs]class IORegister(BoolRegister): """Interface for digital outputs if argument outputmode is True, output mode is set, else input mode""" def __init__(self, read_address, write_address, direction_address, outputmode=True, **kwargs): if outputmode: address = write_address else: address = read_address self.direction_address = direction_address # self.direction = BoolRegister(direction_address,bit=bit, **kwargs) self.outputmode = outputmode # set output direction BoolRegister.__init__(self, address=address, **kwargs)
[docs] def direction(self, obj, v=None): """ sets the direction (inputmode/outputmode) for the Register """ if v is None: v = self.outputmode if v: v = obj._read(self.address) | (1 << self.bit) else: v = obj._read(self.direction_address) & (~(1 << self.bit)) obj._write(self.direction_address, v)
[docs] def get_value(self, obj): self.direction(obj) return BoolRegister.get_value(self, obj)
[docs] def set_value(self, obj, val): self.direction(obj) return BoolRegister.set_value(self, obj, val)
[docs]class NumberProperty(BaseProperty): """ Abstract class for ints and floats """ _widget_class = IntAttributeWidget default = 0 def __init__(self, min=-np.inf, max=np.inf, increment=0, log_increment=False, # if True, the widget has log increment **kwargs): self.min = min self.max = max self.increment = increment self.log_increment = log_increment BaseProperty.__init__(self, **kwargs) def _create_widget(self, module, widget_name=None): widget = BaseProperty._create_widget(self, module, widget_name=widget_name) return widget
[docs] def validate_and_normalize(self, obj, value): """ Saturates value with min and max. """ if value is None: # setting a number to None essentially calls setup() value = self.get_value(obj) return max(min(value, self.max), self.min)
[docs]class IntProperty(NumberProperty): def __init__(self, min=-np.inf, max=np.inf, increment=1, log_increment=False, # if True, the widget has log increment **kwargs): super(IntProperty, self).__init__(min=min, max=max, increment=increment, log_increment=log_increment, **kwargs)
[docs] def validate_and_normalize(self, obj, value): """ Accepts float, but rounds to integer """ if value is None: # setting a number to None essentially calls setup() value = self.get_value(obj) return NumberProperty.validate_and_normalize(self, obj, int(round(value)))
[docs]class IntRegister(BaseRegister, IntProperty): """ Register for integer values encoded on less than 32 bits. """ def __init__(self, address, bits=32, bitmask=None, **kwargs): self.bits = bits self.size = int(np.ceil(float(self.bits) / 32)) BaseRegister.__init__(self, address=address, bitmask=bitmask) if not 'min' in kwargs: kwargs['min'] = 0 if not 'max' in kwargs: kwargs['max'] = 2**self.bits-1 IntProperty.__init__(self, **kwargs)
[docs] def to_python(self, obj, value): return int(value)
[docs] def from_python(self, obj, value): return int(value)
[docs]class ConstantIntRegister(IntRegister): """ Implements an int register that only interacts with the FPGA once and subsequently returns the first read value from python memory. """
[docs] def get_value(self, obj): try: return getattr(obj, '_' + self.name) except AttributeError: value = super(ConstantIntRegister, self).get_value(obj) setattr(obj, '_' + self.name, value) return value
[docs]class LongRegister(IntRegister): """Interface for register of python type int/long with arbitrary length 'bits' (effectively unsigned)"""
[docs] def get_value(self, obj): values = obj._reads(self.address, self.size) value = int(0) for i in range(self.size): value += int(values[i]) << (32 * i) if self.bitmask is None: return self.to_python(obj, value) else: return (self.to_python(obj, value) & self.bitmask)
[docs] def set_value(self, obj, val): val = self.from_python(obj, val) values = np.zeros(self.size, dtype=np.uint32) if self.bitmask is None: for i in range(self.size): values[i] = (val >> (32 * i)) & 0xFFFFFFFF else: act = obj._reads(self.address, self.size) for i in range(self.size): localbitmask = (self.bitmask >> 32 * i) & 0xFFFFFFFF values[i] = ((val >> (32 * i)) & localbitmask) | \ (int(act[i]) & (~localbitmask)) obj._writes(self.address, values)
[docs]class FloatProperty(NumberProperty): """ An attribute for a float value. """ _widget_class = FloatAttributeWidget default = 0.0
[docs] def validate_and_normalize(self, obj, value): """ Try to convert to float, then saturates with min and max """ return NumberProperty.validate_and_normalize(self, obj, float(value))
[docs]class ComplexProperty(FloatProperty): _widget_class = ComplexAttributeWidget
[docs] def validate_and_normalize(self, obj, val): val = complex(val) re = super(ComplexProperty, self).validate_and_normalize(obj, val.real) im = super(ComplexProperty, self).validate_and_normalize(obj, val.imag) return complex(re, im)
[docs]class FloatRegister(IntRegister, FloatProperty): """Implements a fixed point register, seen like a (signed) float from python""" def __init__(self, address, bits=14, # total number of bits to represent on fpga bitmask=None, norm=1.0, # fpga value corresponding to 1 in python signed=True, # otherwise unsigned invert=False, # if False: FPGA=norm*python, if True: FPGA=norm/python **kwargs): IntRegister.__init__(self, address=address, bits=bits, bitmask=bitmask) self.invert = invert self.signed = signed self.norm = float(norm) if 'increment' not in kwargs: kwargs['increment'] = 1.0/self.norm if 'max' not in kwargs: kwargs['max'] = (float(2 ** (self.bits - int(self.signed)) - 1) / self.norm) if 'min' not in kwargs: if self.signed: kwargs['min'] = - float(2 ** (self.bits - int(self.signed))) / self.norm else: kwargs['min'] = 0 FloatProperty.__init__(self, **kwargs)
[docs] def to_python(self, obj, value): # 2's complement if self.signed: if value >= 2 ** (self.bits - 1): value -= 2 ** self.bits # normalization if self.invert: if value == 0: return float(0) else: return 1.0 / float(value) / self.norm else: return float(value) / self.norm
[docs] def from_python(self, obj, value): # round and normalize if self.invert: if value == 0: v = 0 else: v = int(round(1.0 / float(value) * self.norm)) else: v = int(round(float(value) * self.norm)) # make sure small float values are not rounded to zero if (v == 0 and value > 0): v = 1 elif (v == 0 and value < 0): v = -1 if self.signed: # saturation if (v >= 2 ** (self.bits - 1)): v = 2 ** (self.bits - 1) - 1 elif (v < -2 ** (self.bits - 1)): v = -2 ** (self.bits - 1) # 2's complement if (v < 0): v += 2 ** self.bits else: v = abs(v) # take absolute value # unsigned saturation if v >= 2 ** self.bits: v = 2 ** self.bits - 1 return v
[docs] def validate_and_normalize(self, obj, value): """ For unsigned registers, takes the absolute value of the given value. Rounds to the nearest value authorized by the register granularity, then does the same as FloatProperty (==NumberProperty). """ if not self.signed: value = abs(value) return FloatProperty.validate_and_normalize(self, obj, round(value/self.increment)*self.increment)
[docs]class GainRegister(FloatRegister): """ A register used mainly for gains, that replaces round-off to zero by round-off to the lowest-possible value. """ avoid_round_off_to_zero = True
[docs] def validate_and_normalize(self, obj, value): rounded_value = FloatRegister.validate_and_normalize(self, obj, value) if rounded_value == 0 and value != 0: # value was rounded off to zero if self.avoid_round_off_to_zero: rounded_value = FloatRegister.validate_and_normalize( self, obj, np.abs(self.increment)*np.sign(value)) obj._logger.warning("Avoided rounding value %.1e of the " "gain register %s to zero. Setting it to %.1e " "instead. ", value, self.name, rounded_value) else: obj._logger.warning("Rounding value %.1e of the " "gain register %s to zero. ", value, self.name) if value > self.max or value < self.min: obj._logger.warning("Requested gain for %s.%s is outside the " "bounds allowed by the hardware. Desired " "gain of %.1e is capped to %.1e. ", obj.name, self.name, value, rounded_value) return rounded_value
[docs]class FrequencyProperty(FloatProperty): """ An attribute for frequency values Same as FloatAttribute, except it cannot become negative. """ def __init__(self, **kwargs): if 'min' not in kwargs: kwargs['min'] = 0 FloatProperty.__init__(self, **kwargs)
[docs]class FrequencyRegister(FloatRegister, FrequencyProperty): """Registers that contain a frequency as a float in units of Hz""" # attention: no bitmask can be defined for frequencyregisters CLOCK_FREQUENCY = 125e6 def __init__(self, address, **kwargs): FloatRegister.__init__(self, address, **kwargs) self.min = 0 self.max = self.CLOCK_FREQUENCY / 2.0 self.increment = self.CLOCK_FREQUENCY / 2 ** self.bits
[docs] def from_python(self, obj, value): # make sure small float values are not rounded to zero value = abs(float(value) / obj._frequency_correction) if (value == epsilon): value = 1 else: # round and normalize value = int(round( value / self.CLOCK_FREQUENCY * 2 ** self.bits)) # Seems correct (should not be 2**bits -1): 125 MHz # out of reach because 2**bits is out of reach return value
[docs] def to_python(self, obj, value): return 125e6 / 2 ** self.bits * float( value) * obj._frequency_correction
[docs] def validate_and_normalize(self, obj, value): """ Same as FloatRegister, except the value should be positive. """ return FrequencyProperty.validate_and_normalize(self, obj, FloatRegister.validate_and_normalize(self, obj, value))
[docs]class PhaseProperty(FloatProperty): """ An attribute to represent a phase """
[docs] def validate_and_normalize(self, obj, value): """ Rejects anything that is not float, and takes modulo 360 """ return FloatProperty.validate_and_normalize(self, obj, value % 360.)
[docs]class PhaseRegister(FloatRegister, PhaseProperty): """Registers that contain a phase as a float in units of degrees.""" def __init__(self, address, bits=32, bitmask=None, invert=False, **kwargs): FloatRegister.__init__(self, address=address, bits=bits, bitmask=bitmask, invert=invert) PhaseProperty.__init__(self, increment=360. / 2 ** bits, **kwargs)
[docs] def from_python(self, obj, value): if self.invert: value = float(value) * (-1) return int(round(float(value) / 360 * 2 ** self.bits) % 2 ** self.bits)
[docs] def to_python(self, obj, value): phase = float(value) / 2 ** self.bits * 360 if self.invert: phase *= -1 return phase % 360.0
[docs] def validate_and_normalize(self, obj, value): """ Rounds to nearest authorized register value and take modulo 360 """ return ((int(round(float(value) / 360 * 2 ** self.bits)) / 2 ** self.bits) * 360.) % 360.0
[docs]class FilterProperty(BaseProperty): """ An attribute for a list of bandwidth. Each bandwidth has to be chosen in a list given by self.valid_frequencies(module) (evaluated at runtime). If floats are provided, they are normalized to the nearest values in the list. Individual floats are also normalized to a singleton. The number of elements in the list are also defined at runtime. A property for a list of float values to be chosen in valid_frequencies(module). """ _widget_class = FilterAttributeWidget
[docs] def validate_and_normalize(self, obj, value): """ Returns a list with the closest elements in module.valid_frequencies """ if not np.iterable(value): value = [value] value = [min([opt for opt in self.valid_frequencies(obj)], key=lambda x: abs(x - val)) for val in value] if len(value) == 1: return value[0] else: return value
[docs] def get_value(self, obj): if not hasattr(obj, '_' + self.name): # choose any value in the options as default. default = self.valid_frequencies(obj)[0] setattr(obj, '_' + self.name, default) return getattr(obj, '_' + self.name)
[docs] def set_value(self, obj, value): return BaseProperty.set_value(self, obj, value)
[docs] def valid_frequencies(self, module): raise NotImplementedError("this is a baseclass, your derived class " "must implement the following function")
[docs] def refresh_options(self, module): module._signal_launcher.refresh_filter_options.emit(self.name)
[docs]class FilterRegister(BaseRegister, FilterProperty): """ Interface for up to 4 low-/highpass filters in series (filter_block.v) """ _widget_class = FilterAttributeWidget def __init__(self, address, filterstages, shiftbits, minbw, **kwargs): self.filterstages = filterstages self.shiftbits = shiftbits self.minbw = minbw BaseRegister.__init__(self, address=address) FilterProperty.__init__(self, **kwargs)
[docs] def read_and_save(self, obj, attr_name): # save the value of constants saved in the fpga upon first execution # in order to only read the corresponding register once var_name = "_" + self.name + "_" + attr_name if not hasattr(obj, var_name): setattr(obj, var_name, obj._read(getattr(self, attr_name))) return getattr(obj, var_name)
def _FILTERSTAGES(self, obj): return self.read_and_save(obj, "filterstages") def _SHIFTBITS(self, obj): return self.read_and_save(obj, "shiftbits") def _MINBW(self, obj): return self.read_and_save(obj, "minbw") def _MAXSHIFT(self, obj): def clog2(x): """ mirrors the function clog2 in verilog code """ if x < 2: return 1 elif x > 2**32: return -1 elif x > 2**31: return 32 else: return int(np.floor(np.log2(float(x))))+1 return clog2(125000000.0/float(self._MINBW(obj))) #def _ALPHABITS(self, obj): # return int(np.ceil(np.log2(125000000.0 / self._MINBW(obj))))
[docs] def valid_frequencies(self, obj): """ returns a list of all valid filter cutoff frequencies""" #valid_bits = range(0, self._MAXSHIFT(obj)-1) # this is possible valid_bits = range(0, self._MAXSHIFT(obj)-2) # this gives reasonable results (test_filter) pos = list([self.to_python(obj, b | 0x1 << 7) for b in valid_bits]) pos = [val if not np.iterable(val) else val[0] for val in pos] neg = [-val for val in reversed(pos)] valid_frequencies = neg + [0] + pos if obj is not None and not hasattr(obj.__class__, self.name+'_options') and not hasattr(obj, self.name+'_options'): setattr(obj, self.name+'_options', valid_frequencies) return valid_frequencies
# empirical correction factors for the cutoff frequencies in order to be # able to accurately model implemented bandwidth with an analog # butterworth filter. Works well up to 5 MHz. See unittest test_inputfilter correction_factors = {0.5: 0.7, 0.25: 1.65, 0.125: 1.17, 0.0625: 1.08, 0.03125: 1.04, 0.015625: 1.02, 0.0078125: 1.01, 0.001953125: 1.0, 0.00390625: 1.0}
[docs] def to_python(self, obj, value): """ returns a list of bandwidths for the low-pass filter cascade before the module negative bandwidth stands for high-pass instead of lowpass, 0 bandwidth for bypassing the filter """ filter_shifts = value bandwidths = [] for i in range(self._FILTERSTAGES(obj)): v = (filter_shifts >> (i * 8)) & 0xFF shift = v & (2 ** self._SHIFTBITS(obj) - 1) filter_on = ((v >> 7) == 0x1) highpass = (((v >> 6) & 0x1) == 0x1) if filter_on: # difference equation is # y[n] = (1-alpha)*y[n-1] + alpha*x[n] alpha = float(2 ** shift) / (2 ** self._MAXSHIFT(obj)) # old formula #bandwidth = alpha * 125e6 / 2 / np.pi # new, more correct formula (from Oppenheim-Schafer p. 70) bandwidth = -np.log(1.0-alpha)/2.0/np.pi*125e6 # here comes a nasty bugfix to make it work (see issue 242) if alpha in self.correction_factors: bandwidth *= self.correction_factors[alpha] if highpass: bandwidth *= -1.0 else: bandwidth = 0 bandwidths.append(bandwidth) if len(bandwidths) == 1: return bandwidths[0] else: return bandwidths
[docs] def from_python(self, obj, value): try: v = list(value)[:self._FILTERSTAGES(obj)] except TypeError: v = list([value])[:self._FILTERSTAGES(obj)] filter_shifts = 0 for i in range(self._FILTERSTAGES(obj)): if len(v) <= i: bandwidth = 0 else: bandwidth = float(v[i]) if bandwidth == 0: continue else: # old formula #alpha = np.abs(bandwidth)*2*np.pi/125e6 # new formula alpha = 1.0 - np.exp(-np.abs(bandwidth)*2.0*np.pi/125e6) if alpha in self.correction_factors: bandwidth /= self.correction_factors[alpha] alpha = 1.0 - np.exp(-np.abs(bandwidth)*2.0*np.pi/125e6) shift = int(np.round(np.log2(alpha*(2**self._MAXSHIFT(obj))))) if shift < 0: shift = 0 elif shift > (2**self._SHIFTBITS(obj) - 1): shift = (2**self._SHIFTBITS(obj) - 1) shift += 2**7 # turn this filter stage on if bandwidth < 0: shift += 2**6 # turn this filter into a highpass filter_shifts += shift * 2**(8*i) return filter_shifts
[docs]class AttributeList(list): """ A list of attributes. This class is not an attribute/property by itself, but is the object returned by AttributeListProperty that correctly extends list methods to communicate a change in the list throughout pyrpl. When a list-specific operation is performed that alters the values, the AttributeListProperty object is informed about this and will ensure the correct propagation of the signal. """ def __init__(self, parent, module, *args, **kwargs): self._parent = parent self._module = module super(AttributeList, self).__init__(*args, **kwargs) # insert, __setitem__, and __delitem__ completely describe the behavior
[docs] def insert(self, index, new=None): if new is None: new = self._parent.default_element or self._parent.element_cls.default new = self._parent.validate_and_normalize_element(self._module, new) super(AttributeList, self).insert(index, new) self._parent.list_changed(self._module, "insert", index, new) self.selected = index
def __setitem__(self, index, value): # rely on parent's validate_and_normalize function value = self._parent.validate_and_normalize_element(self._module, value) # set value super(AttributeList, self).__setitem__(index, value) self._parent.list_changed(self._module, "setitem", index, value) self.selected = index def __delitem__(self, index=-1): # unselect if selected if self.selected == self._get_unique_index(index): self.selected = None # remove and send message super(AttributeList, self).pop(index) self._parent.list_changed(self._module, "delitem", index) @property def selected(self): if not hasattr(self, '_selected'): self._selected = None return self._selected @selected.setter def selected(self, index): # old = self.selected self._selected = self._get_unique_index(index) self._parent.list_changed(self._module, 'select', self.selected) def _get_unique_index(self, index): try: return self.index(self[index]) except: return None
[docs] def select(self, value): """ selects the element with value, or None if it does not exist """ try: self.selected = self.index(value) except IndexError: self.selected = None
# other convenience functions that are based on above axioms
[docs] def append(self, new=None): self.insert(self.__len__(), new)
[docs] def extend(self, iterable=[]): for i in iterable: self.append(i)
[docs] def pop(self, index=-1): # get attributes item = self[index] self.__delitem__(index) return item
[docs] def remove(self, value): self.__delitem__(self.index(value))
[docs] def clear(self): while len(self) > 0: self.__delitem__()
[docs] def copy(self): return list(self)
[docs] def sort(self, key=None, reverse=False): sorted = self.copy().sort(key=key, reverse=reverse) for i, v in enumerate(sorted): self[i] = v
[docs] def reverse(self): reversed = self.copy() reversed.reverse() for i, v in enumerate(reversed): self[i] = v
[docs]class BasePropertyListProperty(BaseProperty): """ An arbitrary length list of items that behave like BaseProperty. A derived class FloatPropertyListProperty(BasePropertyListProperty) will behave as a list of FloatProperty-like items. """ default = [] _widget_class = BasePropertyListPropertyWidget def __init__(self, *args, **kwargs): """ default is the default list default_element: default new element """ self.default_element = kwargs.pop('default_element', None) super(BasePropertyListProperty, self).__init__(*args, **kwargs) @property def element_cls(self): """ the class of the elements of the list """ return super(BasePropertyListProperty, self)
[docs] def validate_and_normalize(self, obj, value): """ Converts the value into a list. """ return list(value)
[docs] def validate_and_normalize_element(self, obj, val): return self.element_cls.validate_and_normalize(obj, val)
[docs] def get_value(self, obj): if not hasattr(obj, '_' + self.name): # make a new AttributeList, pass to it the instance of obj value = AttributeList(self, obj, self.default) setattr(obj, '_' + self.name, value) return getattr(obj, '_' + self.name)
[docs] def set_value(self, obj, val): current = self.get_value(obj) try: # block repetitive calls to setup call_setup, self.call_setup = self.call_setup, False # replace the already existing list elements and append new ones for i, v in enumerate(val): try: current[i] = v except IndexError: current.append(v) # remove the trailing items while len(current) > len(val): current.pop() finally: self.call_setup = call_setup
[docs] def list_changed(self, module, operation, index, value=None): if operation == 'selecti': # only launch signal in this case, do not call setup # value can be None in this case, as it is not used if value is None: value = self.get_value(module) self.launch_signal(module, value, appendix=[operation, index, value]) else: # launches signal and calls setup() self.value_updated(module, appendix=[operation, index, value])
[docs]class FloatAttributeListProperty(BasePropertyListProperty, FloatProperty): pass
[docs]class ComplexAttributeListProperty(BasePropertyListProperty, ComplexProperty): pass
[docs]class PWMRegister(FloatRegister): """ FloatRegister that defines the PWM voltage similar to setting a float. """ # See FPGA code for a more detailed description on how the PWM works def __init__(self, address, CFG_BITS=24, PWM_BITS=8, **kwargs): self.CFG_BITS = int(CFG_BITS) self.PWM_BITS = int(PWM_BITS) FloatRegister.__init__(self, address=address, bits=14, norm=1, **kwargs) self.min = 0 # voltage of pwm outputs ranges from 0 to 1.8 volts self.max = 1.8 self.increment = (self.max-self.min)/2**(self.bits-1) # actual resolution is 14 bits (roughly 0.1 mV incr.)
[docs] def to_python(self, obj, value): value = int(value) pwm = float(value >> (self.CFG_BITS - self.PWM_BITS) & (2 ** self.PWM_BITS - 1)) mod = value & (2 ** (self.CFG_BITS - self.PWM_BITS) - 1) postcomma = float(bin(mod).count('1')) / (self.CFG_BITS - self.PWM_BITS) voltage = 1.8 * (pwm + postcomma) / 2 ** self.PWM_BITS if voltage > 1.8: logger.error("Readout value from PWM (%h) yields wrong voltage %f", value, voltage) return voltage
[docs] def from_python(self, obj, value): # here we don't bother to minimize the PWM noise # room for improvement is in the low -> towrite conversion value = 0 if (value < 0) else float(value) / 1.8 * (2 ** self.PWM_BITS) high = np.floor(value) if (high >= 2 ** self.PWM_BITS): high = 2 ** self.PWM_BITS - 1 low = int(np.round((value - high) * (self.CFG_BITS - self.PWM_BITS))) towrite = int(high) << (self.CFG_BITS - self.PWM_BITS) towrite += ((1 << low) - 1) & ((1 << self.CFG_BITS) - 1) return towrite
[docs]class StringProperty(BaseProperty): """ An attribute for string (there is no corresponding StringRegister). """ _widget_class = StringAttributeWidget default = ""
[docs] def validate_and_normalize(self, obj, value): """ Convert argument to string """ return str(value)
[docs]class TextProperty(StringProperty): """ Same as StringProperty, but the gui displays it as multi-line text. """ _widget_class = TextAttributeWidget
[docs]class SelectProperty(BaseProperty): """ An attribute for a multiple choice value. The options can be specified at the object creation as a list or an (ordered) dict, or as a callable with one argument (which is None or the module that contains this attribute, depending on when the call is made). Options can be specified at attribute creation, but it can also be updated later on a per-module basis using change_options(new_options). If options are callable, they are evaluated every time they are needed. """ _widget_class = SelectAttributeWidget default = None def __init__(self, options=[], **kwargs): self.default_options = options BaseProperty.__init__(self, **kwargs) @property def __doc__(self): # Append available options to docstring return self.doc + "\r\nOptions:\r\n" + str(list(self.options(None))) @__doc__.setter def __doc__(self, value): self.doc = value
[docs] def get_default(self, instance): """ returns the default value. default is pre-defined value if that is not a valid option. Otherwise the first valid option is taken, and if that is not possible (no options), None is taken. """ default = self.default # internal default # at startup, we cannot access the instance, so we must continue without it if instance is not None: # make sure default is stored in the instance, such that it can be easily modified if not hasattr(instance, '_' + self.name + '_' + 'default'): setattr(instance, '_' + self.name + '_' + 'default', default) default = getattr(instance, '_' + self.name + '_' + 'default') # make sure default is a valid option options = self.options(instance) if not default in options: # if not valid, default default is the first options default = list(options)[0] # if no options are availbale, fall back to None if default is None: logger.warning("Default of SelectProperty %s " "is None. ", self.name) return default
[docs] def options(self, instance=None): """ options are evaluated at run time. options may be callable with instance as optional argument. """ options = self.default_options # at startup, we cannot access the instance, so we must continue without it if instance is not None: # make sure default is stored in the instance, such that it can be easily modified if not hasattr(instance, '_' + self.name + '_' + 'options'): setattr(instance, '_' + self.name + '_' + 'options', options) options = getattr(instance, '_' + self.name + '_' + 'options') if callable(options): try: options = options(instance) except (TypeError, AttributeError): try: options = options() except (TypeError, AttributeError): options = OrderedDict() if not hasattr(options, "keys"): options = OrderedDict([(v, v) for v in options]) if len(options) == 0: logger.debug("SelectProperty %s of module %s has no options!", self.name, instance) options = {None: None} # check whether options keys have changed w.r.t. last time and emit a signal in that # case. Also create a list of valid options in the parent module called # self.name+'_options'. if instance is not None: try: lastoptions = getattr(instance, '_' + self.name + '_lastoptions') except AttributeError: lastoptions = None if options != lastoptions: setattr(instance, '_' + self.name + '_lastoptions', options) # save the keys for the user convenience setattr(instance, self.name + '_options', list(options.keys())) instance._signal_launcher.change_options.emit(self.name, list(options)) # return the actual options return options
[docs] def change_options(self, instance, new_options): """ Changes the possible options acceptable by the Attribute: - New validation takes effect immediately (otherwise a script involving 1. changing the options / 2. selecting one of the new options could not be executed at once) - Update of the ComboxBox is performed behind a signal-slot mechanism to be thread-safe - If the current value is not in the new_options, then value is changed to some available option """ setattr(instance, '_' + self.name + '_' + 'options', new_options) # refresh default options in case options(None) is called (no instance in argument) # this also triggers the signal emission in the method options() self.default_options = self.options(instance)
[docs] def validate_and_normalize(self, obj, value): options = self.options(obj) if not (value in options): msg = "Value '%s' is not an option for SelectAttribute %s of " \ "module %s with options %s" \ % (value, self.name, obj.name, options) if self.ignore_errors: value = self.get_default(obj) logger.warning(msg + ". Picking an arbitrary value %s instead." % str(value)) else: logger.error(msg) raise ValueError(msg) return value
[docs] def get_value(self, obj): if not hasattr(obj, '_' + self.name): setattr(obj, '_' + self.name, self.get_default(obj)) value = getattr(obj, '_' + self.name) # make sure the value is a valid option value = self.validate_and_normalize(obj, value) return value
[docs] def set_value(self, obj, value): BaseProperty.set_value(self, obj, value)
[docs]class SelectRegister(BaseRegister, SelectProperty): """ Implements a selection register, such as for multiplexers. The options must be a dict, where the keys indicate the available options and the values indicate the corresponding fpga register values. If different keys point to the same register value, the keys are nevertheless distinguished (allows implementing aliases that may vary over time if options is a callable object). """ def __init__(self, address, bitmask=None, options={}, **kwargs): BaseRegister.__init__(self, address=address, bitmask=bitmask) SelectProperty.__init__(self, options=options, **kwargs)
[docs] def get_default(self, obj): default = SelectProperty.get_default(self, obj) if default is None and obj is not None: # retrieve default value from FPGA if nothing more reasonable is available value = BaseRegister.get_value(self, obj) for k, v in self.options(obj).items(): if v == value: default = k break return default
[docs] def get_value(self, obj): value = SelectProperty.get_value(self, obj) # make sure the register value corresponds to the selected option expected_value = self.options(obj)[value] raw_value = BaseRegister.get_value(self, obj) if raw_value != expected_value: obj._logger.warning("Register %s of module %s has value %s, " "which does not correspond to selected " "option %s. Setting to '%s'. ", self.name, obj.name, raw_value, expected_value, value) BaseRegister.set_value(self, obj, expected_value) return value
[docs] def set_value(self, obj, value): SelectProperty.set_value(self, obj, value) BaseRegister.set_value(self, obj, self.options(obj)[value])
[docs] def to_python(self, obj, value): return int(value)
[docs] def from_python(self, obj, value): return int(value)
[docs]class ProxyProperty(BaseProperty): """ An attribute that is a proxy to another attribute. This attribute essentially behaves like the one that is reached by instance.path_to_target, always staying in synch. """ def __init__(self, path_to_target, **kwargs): self.path_to_target = path_to_target lastpart = path_to_target.split('.')[-1] self.target_attribute = lastpart self.path_to_target_module = path_to_target[:-(len(lastpart)+1)] #+1 for the dot self.path_to_target_descriptor = self.path_to_target_module \ + '.__class__.' \ + lastpart BaseProperty.__init__(self, **kwargs) def _target_to_proxy(self, obj, target): """ override this function to implement conversion between target and proxy""" return target def _proxy_to_target(self, obj, proxy): """ override this function to implement conversion between target and proxy""" return proxy def __get__(self, instance, owner): if instance is None: return self self.instance = instance # dangerous, but works because we only call __getattribute__ # immediately after __set__ or __get__ self.connect_signals(instance) return self._target_to_proxy(instance, recursive_getattr(instance, self.path_to_target)) def __set__(self, obj, value): self.instance = obj self.connect_signals(obj) recursive_setattr(obj, self.path_to_target, self._proxy_to_target(obj, value)) def __getattribute__(self, item): try: return BaseProperty.__getattribute__(self, item) except AttributeError: attr = recursive_getattr(self.instance, self.path_to_target_descriptor + '.' + item) #if callable(attr): # return partial(attr, self.instance) #else: return attr # special functions for SelectProperties, which transform the argument # 'obj' from the hosting module to the target module to avoid redundant # saving of options
[docs] def options(self, obj): if obj is None: obj = self.instance module = recursive_getattr(obj, self.path_to_target_module) options = recursive_getattr(obj, self.path_to_target_descriptor + '.options')(module) return OrderedDict([(self._target_to_proxy(obj, k), v) for k, v in options.items()])
[docs] def change_options(self, obj, new_options): if obj is None: obj = self.instance module = recursive_getattr(obj, self.path_to_target_module) return recursive_getattr(obj, self.path_to_target_descriptor + '.change_options')(module, new_options)
def __repr__(self): try: targetdescr = " (target: " \ + recursive_getattr(self.instance, self.path_to_target_descriptor).__repr__() \ + ")" except: targetdescr = "" return super(ProxyProperty, self).__repr__() + targetdescr
[docs] def connect_signals(self, instance): """ function that takes care of forwarding signals from target to signal_launcher of proxy module """ if hasattr(instance, '_' + self.name + '_connected'): return # skip if connection has already been set up else: module = recursive_getattr(instance, self.path_to_target_module) def forward_update_attribute_by_name(name, value): """ forward the signal, but change attribute name """ if name == self.target_attribute: instance._signal_launcher.update_attribute_by_name.emit( self.name, [self._target_to_proxy(instance, value[0])]) if self.call_setup: instance.setup() module._signal_launcher.update_attribute_by_name.connect( forward_update_attribute_by_name) def forward_change_options(name, new_options): """ forward the signal, but change attribute name """ if name == self.target_attribute: # update local list of options setattr(instance, self.name + '_options', new_options) # forward the signal instance._signal_launcher.change_options.emit( self.name, new_options) module._signal_launcher.change_options.connect( forward_change_options) # remember that we are now connected setattr(instance, '_' + self.name + '_connected', True)
def _create_widget(self, module, widget_name=None, **kwargs): target_module = recursive_getattr(module, self.path_to_target_module) target_descriptor = recursive_getattr(module, self.path_to_target_descriptor) if widget_name is None: widget_name = self.name #return recursive_getattr(module, # self.path_to_target_descriptor + # '._create_widget')(target_module, # widget_name=widget_name, # **kwargs) self._widget_class = recursive_getattr(module, self.path_to_target_descriptor + '._widget_class') try: # try to make a widget for proxy return recursive_getattr(module, self.path_to_target_descriptor + '.__class__._create_widget')(self, module, #widget_name=widget_name, **kwargs) except: # make a renamed widget for target return recursive_getattr(module, self.path_to_target_descriptor + '.__class__._create_widget')(target_descriptor, target_module, widget_name=widget_name, **kwargs)
[docs]class ModuleAttribute(BaseProperty): """ This is the base class for handling submodules of a module. The actual implementation is found in module_attributes.ModuleProperty. This object is only used inside the Module class """
[docs]class CurveSelectProperty(SelectProperty): """ An attribute to select a curve from all available ones. The curve object is loaded to instance._name_object, where 'name' stands for the name of this attribute. The property can be set by either passing a CurveDB object, or a curve id. """ def __init__(self, no_curve_first=False, show_childs=False, **kwargs): self.no_curve_first = no_curve_first self.show_childs = show_childs SelectProperty.__init__(self, options=self._default_options, **kwargs) def _default_options(self): if self.no_curve_first: return [-1] + [curve.pk for curve in CurveDB.all()] else: return [curve.pk for curve in CurveDB.all()] + [-1] #return OrderedDict([(k, k) for k in (CurveDB.all()) + [-1]])
[docs] def validate_and_normalize(self, obj, value): # returns none or a valid curve corresponding to the given curve or id if isinstance(value, CurveDB): value = value.pk try: pk = int(value) except: pk = -1 return pk
[docs] def set_value(self, obj, pk): SelectProperty.set_value(self, obj, pk) try: curve = CurveDB.get(pk) except: curve = None setattr(obj, '_' + self.name + '_object', curve)
[docs]class CurveProperty(CurveSelectProperty): """ property for a curve whose widget plots the corresponding curve. Unfortunately, the widget does not allow to select the curve, i.e. selection must be implemented with another CurveSelectProperty. """ _widget_class = CurveAttributeWidget
[docs]class CurveSelectListProperty(CurveSelectProperty): """ same as above, but widget is a list to select from """ _widget_class = CurveSelectAttributeWidget
[docs]class Plotter(BaseProperty): """ property for plotting in the GUI window. passing a value, list of values, or a dict of color-value pairs results in plotting the values as a function of time in the GUI """ _widget_class = PlotAttributeWidget def __init__(self, legend="value"): self.legend = legend super(Plotter, self).__init__()
[docs]class DataProperty(BaseProperty): """ Property for a dataset (real or complex), that can be plotted. """ _widget_class = DataAttributeWidget