Fork PyRPL on GitHub

Source code for pyrpl.software_modules.loop

"""
Defines a number of Loop modules to be used to perform periodically a task
"""
import numpy as np
import pyqtgraph as pg
from ..modules import Module
from ..async_utils import MainThreadTimer
from ..pyrpl_utils import time


[docs]class Loop(Module): def __init__(self, parent, name='loop', interval=1.0, autostart=True, loop_function=None, setup_function=None, teardown_function=None, **kwargs): # parent is parent pyrpl module # name is important for the right config file section name # optionally, init_function, loop_function, and clear_function can be passed # as arguments super(Loop, self).__init__(parent, name=name) self.kwargs = kwargs # allows using kwargs in setup_loop if setup_function is not None: self.setup_loop = setup_function if loop_function is not None: self.loop = loop_function if teardown_function is not None: self.teardown_loop = teardown_function self._ended = False # becomes True when loop is ended self.timer = MainThreadTimer(interval=0) # interval in seconds self.interval = interval self.timer.timeout.connect(self.main_loop) self.n = 0 # counter for the number of loops self.time # initialize start time in internal time format # call custom initialization (excluded above) try: self.setup_loop() except TypeError: # allows to pass instance functions of the parent module as arguments as well self.setup_loop(self.parent, self) if autostart: self.main_loop() @property def time(self): """ time since start of the loop """ try: return time() - self.loop_start_time except AttributeError: self.loop_start_time = time() return 0 @property def interval(self): return float(self.timer.interval())/1000.0 @interval.setter def interval(self, val): self.timer.setInterval(val*1000.0) def _clear(self): self._ended = True self.timer.stop() try: self.teardown_loop() except TypeError: # allows to pass instance functions of the parent module as arguments as well self.teardown_loop(self.parent, self) super(Loop, self)._clear()
[docs] def main_loop(self): try: try: self.loop() except TypeError: # allows to pass instance functions of the parent module as arguments as well try: self.loop(self.parent, self) except TypeError: self.loop(self.parent) except BaseException as e: # we do not want the loop to stop launching the next cycle (code # below this) if there is an exception raised by the loop function) self._logger.error("Error in main_loop of %s: %s", self.name, e) # increment counter self.n += 1 if not self._ended: self.timer.start()
[docs] def setup_loop(self): """ put your initialization routine here""" pass
[docs] def pause_loop(self): self._ended = True
[docs] def start_loop(self): self._ended = False self.main_loop()
[docs] def loop(self): # insert your loop function here pass
[docs] def teardown_loop(self): """ put your destruction routine here""" pass
# useful helpers for precise timing, e.g. of the trigger module @property def fpga_time(self): """ current FPGA time in s since startup """ return 8e-9 * self.pyrpl.rp.trig.current_timestamp / \ self.pyrpl.rp.frequency_correction \ - self.loop_start_time @property def trigger_time(self): """ FPGA time in s when trigger even occured (same frame of reference as self.time())""" return 8e-9 * self.pyrpl.rp.trig.trigger_timestamp / \ self.pyrpl.rp.frequency_correction \ - self.loop_start_time
[docs]class PlotWindow(object): """ makes a plot window where the x-axis is time since startup. append(color=value) adds new data to the plot for color in (red, green). close() closes the plot""" def __init__(self, title="plotwindow"): self.win = pg.GraphicsWindow(title=title) self.pw = self.win.addPlot() self.curves = {} self.win.show() self.plot_start_time = time() _defaultcolors = ['g', 'r', 'b', 'y', 'c', 'm', 'o', 'w']
[docs] def append(self, *args, **kwargs): """ usage: append(green=0.1, red=0.5, blue=0.21) # former, now almost deprecated version: append(0.5, 0.6) """ for k in kwargs.keys(): v = kwargs.pop(k) kwargs[k[0]] = v i=0 for value in args: while self._defaultcolors[i] in kwargs: i += 1 kwargs[self._defaultcolors[i]] = value t = time()-self.plot_start_time for color, value in kwargs.items(): if value is not None: if not color in self.curves: self.curves[color] = self.pw.plot(pen=color) curve = self.curves[color] x, y = curve.getData() if x is None or y is None: x, y = np.array([t]), np.array([value]) else: x, y = np.append(x, t), np.append(y, value) curve.setData(x, y)
[docs] def close(self): self.win.close()
[docs]class PlotLoop(Loop): def __init__(self, *args, **kwargs): try: self.plot = kwargs.pop("plot") except KeyError: self.plot = True try: self.plotter = kwargs.pop("plotter") except KeyError: self.plotter = None if self.plot and self.plotter is None: self.plot = PlotWindow(title=self.name) super(PlotLoop, self).__init__(*args, **kwargs)
[docs] def plotappend(self, *args, **kwargs): if self.plot: if self.plotter is not None: setattr(self.parent, self.plotter, (args, kwargs)) else: try: self.plot.append(**kwargs) except BaseException as e: self._logger.error("Error occured during plotting in Loop %s: %s", self.name, e)
def _clear(self): super(PlotLoop, self)._clear() if hasattr(self, 'plot') and hasattr(self.plot, 'close'): self.plot.close()