Fork PyRPL on GitHub

Source code for pyrpl.memory

###############################################################################
#    pyrpl - DSP servo controller for quantum optics with the RedPitaya
#    Copyright (C) 2014-2016  Leonhard Neuhaus  (neuhaus@spectro.jussieu.fr)
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

import os
from collections import OrderedDict
from shutil import copyfile
import numpy as np
import time
from qtpy import QtCore
from . import default_config_dir, user_config_dir
from .pyrpl_utils import time

import logging
logger = logging.getLogger(name=__name__)


[docs]class UnexpectedSaveError(RuntimeError): pass
# the config file is read through a yaml interface. The preferred one is # ruamel.yaml, since it allows to preserve comments and whitespace in the # config file through roundtrips (the config file is rewritten every time a # parameter is changed). If ruamel.yaml is not installed, the program will # issue a warning and use pyyaml (=yaml= instead). Comments are lost in this # case. try: raise # disables ruamel support import ruamel.yaml #ruamel.yaml.add_implicit_resolver() ruamel.yaml.RoundTripDumper.add_representer(np.float64, lambda dumper, data: dumper.represent_float(float(data))) ruamel.yaml.RoundTripDumper.add_representer(complex, lambda dumper, data: dumper.represent_str(str(data))) ruamel.yaml.RoundTripDumper.add_representer(np.complex128, lambda dumper, data: dumper.represent_str(str(data))) ruamel.yaml.RoundTripDumper.add_representer(np.ndarray, lambda dumper, data: dumper.represent_list(list(data))) #http://stackoverflow.com/questions/13518819/avoid-references-in-pyyaml #ruamel.yaml.RoundTripDumper.ignore_aliases = lambda *args: True def load(f): return ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader) def save(data, stream=None): return ruamel.yaml.dump(data, stream=stream, Dumper=ruamel.yaml.RoundTripDumper, default_flow_style=False) except: logger.debug("ruamel.yaml could not be imported. Using yaml instead. " "Comments in config files will be lost.") import yaml # see http://stackoverflow.com/questions/13518819/avoid-references-in-pyyaml #yaml.Dumper.ignore_aliases = lambda *args: True # NEVER TESTED # ordered load and dump for yaml files. From # http://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
[docs] def load(stream, Loader=yaml.SafeLoader, object_pairs_hook=OrderedDict): class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader)
[docs] def save(data, stream=None, Dumper=yaml.SafeDumper, default_flow_style=False, encoding='utf-8', **kwds): class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) OrderedDumper.add_representer(np.float64, lambda dumper, data: dumper.represent_float(float(data))) OrderedDumper.add_representer(complex, lambda dumper, data: dumper.represent_str(str(data))) OrderedDumper.add_representer(np.complex128, lambda dumper, data: dumper.represent_str(str(data))) OrderedDumper.add_representer(np.ndarray, lambda dumper, data: dumper.represent_list(list(data))) # I added the following two lines to make pyrpl compatible with pyinstruments. In principle they can be erased if isinstance(data, dict) and not isinstance(data, OrderedDict): data = OrderedDict(data) return yaml.dump(data, stream=stream, Dumper=OrderedDumper, default_flow_style=default_flow_style, encoding=encoding, **kwds)
# usage example: # load(stream, yaml.SafeLoader) # save(data, stream=f, Dumper=yaml.SafeDumper)
[docs]def isbranch(obj): return isinstance(obj, dict) or isinstance(obj, list)
# two functions to locate config files def _get_filename(filename=None): """ finds the correct path and name of a config file """ # accidentally, we may pass a MemoryTree object instead of file if isinstance(filename, MemoryTree): return filename._filename # get extension right if not filename.endswith(".yml"): filename = filename + ".yml" # see if filename is found with given path, or in user_config or in default_config p, f = os.path.split(filename) for path in [p, user_config_dir, default_config_dir]: file = os.path.join(path, f) if os.path.isfile(file): return file # file not existing, place it in user_config_dir return os.path.join(user_config_dir, f)
[docs]def get_config_file(filename=None, source=None): """ returns the path to a valid, existing config file with possible source specification """ # if None is specified, that means we do not want a persistent config file if filename is None: return filename # try to locate the file filename = _get_filename(filename) if os.path.isfile(filename): # found a file p, f = os.path.split(filename) if p == default_config_dir: # check whether path is default_config_dir and make a copy in # user_config_dir in order to not alter original files dest = os.path.join(user_config_dir, f) copyfile(filename, dest) return dest else: return filename # file not existing, try to get it from source if source is not None: source = _get_filename(source) if os.path.isfile(source): # success - copy the source logger.debug("File " + filename + " not found. New file created from source '%s'. "%source) copyfile(source,filename) return filename # still not returned -> create empty file with open(filename, mode="w"): pass logger.debug("File " + filename + " not found. New file created. ") return filename
[docs]class MemoryBranch(object): """Represents a branch of a memoryTree All methods are preceded by an underscore to guarantee that tab expansion of a memory branch only displays the available subbranches or leaves. A memory tree is a hierarchical structure. Nested dicts are interpreted as subbranches. Parameters ---------- parent: MemoryBranch parent is the parent MemoryBranch branch: str branch is a string with the name of the branch to create defaults: list list of default branches that are used if requested data is not found in the current branch Class members ----------- all properties without preceeding underscore are config file entries _data: the raw data underlying the branch. Type depends on the loader and can be dict, OrderedDict or CommentedMap _dict: similar to _data, but the dict contains all default branches _defaults: list of MemoryBranch objects in order of decreasing priority that are used as defaults for the Branch. Changing the default values from the software will replace the default values in the current MemoryBranch but not alter the underlying default branch. Changing the default branch when it is not overridden by the current MemoryBranch results in an effective change in the branch. _keys: same as _dict._keys() _update: updates the branch with another dict _pop: removes a value/subbranch from the branch _root: the MemoryTree object (root) of the tree _parent: the parent of the branch _branch: the name of the branch _get_or_create: creates a new branch and returns it. Same as branch[newname]=dict(), but also supports nesting, e.g. newname="lev1.lev2.level3" _fullbranchname: returns the full path from root to the branch _getbranch: returns a branch by specifying its path, e.g. 'b1.c2.d3' _rename: renames the branch _reload: attempts to reload the data from disc _save: attempts to save the data to disc If a subbranch or a value is requested but does not exist in the current MemoryTree, a KeyError is raised. """ def __init__(self, parent, branch): self._parent = parent self._branch = branch self._update_instance_dict() def _update_instance_dict(self): data = self._data if isinstance(data, dict): for k in self.__dict__.keys(): if k not in data and not k.startswith('_'): self.__dict__.pop(k) for k in data.keys(): # write None since this is only a # placeholder (__getattribute__ is overwritten below) self.__dict__[k] = None @property def _data(self): """ The raw data (OrderedDict) or Mapping of the branch """ return self._parent._data[self._branch] @_data.setter def _data(self, value): logger.warning("You are directly modifying the data of MemoryBranch" " %s to %s.", self._fullbranchname, str(value)) self._parent._data[self._branch] = value def _keys(self): if isinstance(self._data, list): return range(self.__len__()) else: return self._data.keys() def _update(self, new_dict): if isinstance(self._data, list): raise NotImplementedError self._data.update(new_dict) self._save() # keep auto_completion up to date for k in new_dict: self.__dict__[k] = None def __getattribute__(self, name): """ implements the dot notation. Example: self.subbranch.leaf returns the item 'leaf' of 'subbranch' """ if name.startswith('_'): return super(MemoryBranch, self).__getattribute__(name) else: # convert dot notation into dict notation return self[name] def __getitem__(self, item): """ __getitem__ bypasses the higher-level __getattribute__ function and provides direct low-level access to the underlying dictionary. This is much faster, as long as no changes have been made to the config file. """ self._reload() # if a subbranch is requested, iterate through the hierarchy if isinstance(item, str) and '.' in item: item, subitem = item.split('.', 1) return self[item][subitem] else: # otherwise just return what we can find attribute = self._data[item] # read from the data dict if isbranch(attribute): # if the object can be expressed as a branch, do so return MemoryBranch(self, item) else: # otherwise return whatever we found in the data dict return attribute def __setattr__(self, name, value): if name.startswith('_'): super(MemoryBranch, self).__setattr__(name, value) else: # implemment dot notation self[name] = value def __setitem__(self, item, value): """ creates a new entry, overriding the protection provided by dot notation if the value of this entry is of type dict, it becomes a MemoryBranch new values can be added to the branch in the same manner """ # if the subbranch is set or replaced, to this in a specific way if isbranch(value): # naive way: self._data[item] = dict(value) # rather: replace values in their natural order (e.g. if value is OrderedDict) # make an empty subbranch if isinstance(value, list): self._set_data(item, []) subbranch = self[item] # use standard setter to set the values 1 by 1 and possibly as subbranch objects for k, v in enumerate(value): subbranch[k] = v else: # dict-like # makes an empty subbranch self._set_data(item, dict()) subbranch = self[item] # use standard setter to set the values 1 by 1 and possibly as subbranch objects for k, v in value.items(): subbranch[k] = v #otherwise just write to the data dictionary else: self._set_data(item, value) if self._root._WARNING_ON_SAVE or self._root._ERROR_ON_SAVE: logger.warning("Issuing call to MemoryTree._save after %s.%s=%s", self._branch, item, value) self._save() # update the __dict__ for autocompletion self.__dict__[item] = None def _set_data(self, item, value): """ helper function to manage setting list entries that do not exist """ if isinstance(self._data, list) and item == len(self._data): self._data.append(value) else: # trivial case: _data is dict or item within list length # and we can simply set the entry self._data[item] = value def _pop(self, name): """ remove an item from the branch """ value = self._data.pop(name) if name in self.__dict__.keys(): self.__dict__.pop(name) self._save() return value def _rename(self, name): self._parent[name] = self._parent._pop(self._branch) self._save() def _get_or_create(self, name): """ creates a new subbranch with name=name if it does not exist already and returns it. If name is a branch hierarchy such as "subbranch1.subbranch2.subbranch3", all three subbranch levels are created """ if isinstance(name, int): if name == 0 and len(self) == 0: # instantiate a new list - odd way because we must self._parent._data[self._branch] = [] # if index <= len, creation is done automatically if needed # otherwise an error is raised if name >= len(self): self[name] = dict() return self[name] else: # dict-like subbranch, support several sublevels separated by '.' # chop name into parts and iterate through them currentbranch = self for subbranchname in name.split("."): # make new branch if applicable if subbranchname not in currentbranch._data.keys(): currentbranch[subbranchname] = dict() # move into new branch in case another subbranch will be created currentbranch = currentbranch[subbranchname] return currentbranch def _erase(self): """ Erases the current branch """ self._parent._pop(self._branch) self._save() @property def _root(self): """ returns the parent highest in hierarchy (the MemoryTree object) """ parent = self while parent != parent._parent: parent = parent._parent return parent @property def _fullbranchname(self): parent = self._parent branchname = self._branch while parent != parent._parent: branchname = parent._branch + '.' + branchname parent = parent._parent return branchname def _reload(self): """ reload data from file""" self._parent._reload() def _save(self): """ write data to file""" self._parent._save() def _get_yml(self, data=None): """ :return: returns the yml code for this branch """ return save(self._data if data is None else data).decode('utf-8') def _set_yml(self, yml_content): """ :param yml_content: sets the branch to yml_content :return: None """ branch = load(yml_content) self._parent._data[self._branch] = branch self._save() def __len__(self): return len(self._data) def __contains__(self, item): return item in self._data def __repr__(self): return "MemoryBranch(" + str(self._keys()) + ")" def __add__(self, other): """ makes it possible to add list-like memory tree to a list """ if not isinstance(self._data, list): raise NotImplementedError return self._data + other def __radd__(self, other): """ makes it possible to add list-like memory tree to a list """ if not isinstance(self._data, list): raise NotImplementedError return other + self._data
[docs]class MemoryTree(MemoryBranch): """ The highest level of a MemoryBranch construct. All attributes of this object that do not start with '_' are other MemoryBranch objects or Leaves, i.e. key - value pairs. Parameters ---------- filename: str The filename of the .yml file defining the MemoryTree structure. """ ##### internal load logic: # 1. initially, call _load() to get the data from the file # 2. upon each inquiry of the config data, _reload() is called to # ensure data integrity # 3. _reload assumes a delay of _loadsavedeadtime between changing the # config file and Pyrpl requesting the new data. That means, _reload # will not attempt to touch the config file more often than every # _loadsavedeadtime. The last interaction time with the file system is # saved in the variable _lastreload. If this time is far enough in the # past, the modification time of the config file is compared to _mtime, # the internal memory of the last modifiation time by pyrpl. If the two # don't match, the file was altered outside the scope of pyrpl and _load # is called to reload it. ##### internal save logic: # this structure will hold the data. Must define it here as immutable # to overwrite the property _data of MemoryBranch _data = None _WARNING_ON_SAVE = False # flag that is used to debug excessive calls to # save _ERROR_ON_SAVE = False # Set this flag to true to raise # Exceptions upon save def __init__(self, filename=None, source=None, _loadsavedeadtime=3.0): # never reload or save more frequently than _loadsavedeadtime because # this is the principal cause of slowing down the code (typ. 30-200 ms) # for immediate saving, call _save_now, for immediate loading _load_now self._loadsavedeadtime = _loadsavedeadtime # first, make sure filename exists self._filename = get_config_file(filename, source) if filename is None: # to simulate a config file, only store data in memory self._filename = filename self._data = OrderedDict() self._lastsave = time() # create a timer to postpone to frequent savings self._savetimer = QtCore.QTimer() self._savetimer.setInterval(self._loadsavedeadtime*1000) self._savetimer.setSingleShot(True) self._savetimer.timeout.connect(self._write_to_file) self._load() self._save_counter = 0 # cntr for unittest and debug purposes self._write_to_file_counter = 0 # cntr for unittest and debug purposes # root of the tree is also a MemoryBranch with parent self and # branch name "" super(MemoryTree, self).__init__(self, "") @property def _buffer_filename(self): """ makes a temporary file to ensure modification of config file is atomic (double-buffering like operation...)""" return self._filename + '.tmp' def _load(self): """ loads data from file """ if self._filename is None: # if no file is used, just ignore this call return logger.debug("Loading config file %s", self._filename) # read file from disc with open(self._filename) as f: self._data = load(f) # store the modification time of this file version self._mtime = os.path.getmtime(self._filename) # make sure that reload timeout starts from this moment self._lastreload = time() # empty file gives _data=None if self._data is None: self._data = OrderedDict() # update dict of the MemoryTree object to_remove = [] # remove all obsolete entries for name in self.__dict__: if not name.startswith('_') and name not in self._data: to_remove.append(name) for name in to_remove: self.__dict__.pop(name) # insert the branches into the object __dict__ for auto-completion self.__dict__.update(self._data) def _reload(self): """ reloads data from file if file has changed recently """ # first check if a reload was not performed recently (speed up reasons) if self._filename is None: return # check whether reload timeout has expired if time() > self._lastreload + self._loadsavedeadtime: # prepare next timeout self._lastreload = time() logger.debug("Checking change time of config file...") if self._mtime != os.path.getmtime(self._filename): logger.debug("Loading because mtime %s != filetime %s", self._mtime) self._load() else: logger.debug("... no reloading required") def _write_to_file(self): """ Immmediately writes the content of the memory tree to file """ # stop save timer if hasattr(self, '_savetimer') and self._savetimer.isActive(): self._savetimer.stop() self._lastsave = time() self._write_to_file_counter += 1 logger.debug("Saving config file %s", self._filename) if self._filename is None: # skip writing to file if no filename was selected return else: if self._mtime != os.path.getmtime(self._filename): logger.warning("Config file has recently been changed on your " + "harddisk. These changes might have been " + "overwritten now.") # we must be sure that overwriting config file never destroys existing data. # security 1: backup with copyfile above copyfile(self._filename, self._filename + ".bak") # maybe this line is obsolete (see below) # security 2: atomic writing such as shown in # http://stackoverflow.com/questions/2333872/atomic-writing-to-file-with-python: try: f = open(self._buffer_filename, mode='w') save(self._data, stream=f) f.flush() os.fsync(f.fileno()) f.close() os.unlink(self._filename) os.rename(self._buffer_filename, self._filename) except: copyfile(self._filename + ".bak", self._filename) logger.error("Error writing to file. Backup version was restored.") raise # save last modification time of the file self._mtime = os.path.getmtime(self._filename) def _save(self, deadtime=None): """ A call to this function means that the state of the tree has changed and needs to be saved eventually. To reduce system load, the delay between two writes will be at least deadtime (defaults to self._loadsavedeadtime if None) """ if self._ERROR_ON_SAVE: raise UnexpectedSaveError("Save to config file should not " "happen now") if self._WARNING_ON_SAVE: logger.warning("Save counter has just been increased to %d.", self._save_counter) self._save_counter += 1 # for unittest and debug purposes if deadtime is None: deadtime = self._loadsavedeadtime # now write current tree structure and data to file if self._lastsave + deadtime < time(): self._write_to_file() else: # make sure saving will eventually occur by launching a timer if not self._savetimer.isActive(): self._savetimer.start() @property def _filename_stripped(self): try: return os.path.split(self._filename)[1].split('.')[0] except: return 'default'