Fork PyRPL on GitHub

Source code for pyrpl.curvedb

###############################################################################
#    pyrpl - DSP servo controller for quantum optics with the RedPitaya
#    Copyright (C) 2014-2016  Leonhard Neuhaus  (neuhaus@spectro.jussieu.fr)
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################


# Dummy version of CurveDB class
#
# The original version of this code uses a package called pyinstruments, which
# is a database for experimental measurement data.
# This class simulates the functionality of pyinstruments to guarantee
# standalone functionality of the rplockbox package.
# In case you
# For full performance, download pyinstruments to replace this class
# otherwise you can custimize here what is to be done to your data
#
import numpy as np
import pandas as pd
import os
import logging
import pickle as file_backend
#import json as file_backend  # currently unable to store pandas


# optional override of CurveDB class with custom module, as defined in
# ./pyrpl/config/global_config.yml
try:
    from . import global_config
    CurveDB = __import__(global_config.general.curvedb).CurveDB
except:
    from . import user_curve_dir
[docs] class CurveDB(object): _dirname = user_curve_dir file_extension = '.dat' if not os.path.exists(_dirname): # if _dirname doesn't exist, some unexpected errors will occur. os.mkdir(_dirname) def __init__(self, name="some_curve"): """ A CurveDB object has - name = string to give the curve a name - pk = integer to uniquely identify the curve (the database primary key) - data = pandas.Series() object to hold any data - params = dict() with all kinds of parameters """ self.logger = logging.getLogger(name=__name__) self.params = dict() x, y = np.array([], dtype=np.float), np.array([], dtype=np.float) self.data = (x, y) self.name = name @property def name(self): return self.params["name"] @name.setter def name(self, val): self.params["name"] = val return val @classmethod
[docs] def create(cls, *args, **kwds): """ Creates a new curve, first arguments should be either Series(y, index=x) or x, y. kwds will be passed to self.params """ if len(args) == 0: ser = (np.array([], dtype=np.float), np.array([], dtype=np.float)) if len(args) == 1: if isinstance(args[0], pd.Series): x, y = args[0].index.values, args[0].values ser = (x, y) elif isinstance(args[0], (np.array, list, tuple)): ser = args[0] else: raise ValueError("cannot recognize argument %s as numpy.array or pandas.Series.", args[0]) elif len(args) == 2: x = np.array(args[0]) y = np.array(args[1]) ser = (x, y) else: raise ValueError("first arguments should be either x or x, y") obj = cls() obj.data = ser obj.params = kwds if not 'name' in obj.params: obj.params['name'] = 'new_curve' pk = obj.pk # make a pk if "childs" not in obj.params: obj.params["childs"] = None if ("autosave" not in kwds) or (kwds["autosave"]): obj.save() return obj
[docs] def plot(self): self.data.plot()
# Implement the following methods if you want to save curves permanently @classmethod
[docs] def get(cls, curve): if isinstance(curve, CurveDB): return curve elif isinstance(curve, list): return [CurveDB.get(c) for c in curve] else: with open(os.path.join(CurveDB._dirname, str(curve) + cls.file_extension), 'rb' if file_backend.__name__ == 'pickle' else 'r')\ as f: # rb is for compatibility with python 3 # see http://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes curve = CurveDB() curve._pk, curve.params, data = file_backend.load(f) curve.data = tuple([np.asarray(a) for a in data]) if isinstance(curve.data, pd.Series): # for backwards compatibility x, y = curve.data.index.values, curve.data.values curve.data = (x, y) return curve
[docs] def save(self): with open(os.path.join(self._dirname, str(self.pk) + self.file_extension), 'wb' if file_backend.__name__ == 'pickle' else 'w')\ as f: # wb is for compatibility with python 3 # see http://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes data = [a.tolist() for a in self.data] file_backend.dump([self.pk, self.params, data], f, )
[docs] def delete(self): # remove the file delpk = self.pk parent = self.parent childs = self.childs if isinstance(childs, list) and len(childs)> 0: self.logger.debug("Deleting all childs of curve %d"%delpk) for child in childs: child.delete() self.logger.debug("Deleting curve %d" % delpk) try: filename = os.path.join(self._dirname, str(self.pk) + self.file_extension) os.remove(filename) except OSError: self.logger.warning("Could not find and remove the file %s. ", filename) if parent: parentchilds = parent.childs parentchilds.remove(delpk) parent.childs = parentchilds parent.save()
# Implement the following methods if you want to use a hierarchical # structure for curves @property def childs(self): try: childs = self.params["childs"] except KeyError: return [] if childs is None: return [] else: try: return CurveDB.get(childs) except KeyError: return [] @property def parent(self): try: parentid = self.params["parent"] except KeyError: self.logger.debug("No parent found.") return None else: return CurveDB.get(parentid)
[docs] def add_child(self, child_curve): child = CurveDB.get(child_curve) child.params["parent"] = self.pk child.save() childs = self.params["childs"] or [] self.params["childs"] = list(childs+[child.pk]) self.save()
@classmethod
[docs] def all_pks(cls): """ Returns: list of int: A list of the primary keys of all CurveDB objects on the computer. """ pks = [int(f.split('.dat')[0]) for f in os.listdir(cls._dirname) if f.endswith('.dat')] return sorted(pks, reverse=True)
@classmethod
[docs] def all(cls): """ Returns: list of CurveDB: A list of all CurveDB objects on the computer. """ return [cls.get(pk) for pk in cls.all_pks()]
@property def pk(self): """ (int): The primary Key of the """ if hasattr(self, "_pk"): return self._pk else: pks = self.all_pks() if len(pks) == 0: self._pk = 1 else: self._pk = max(pks) + 1 # create the file to make this pk choice persistent with open(os.path.join(self._dirname, str(self._pk) + ".dat"), 'w') as f: f.close() return self._pk return -1 # a proper implementation will assign the database primary key for pk # the primary key is used to load a curve from the storage into memory
[docs] def sort(self): """numerically sorts the data series so that indexing can be used""" X, Y = self.data xs = np.array([x for (x, y) in sorted(zip(X, Y))], dtype=np.float64) ys = np.array([y for (x, y) in sorted(zip(X, Y))], dtype=np.float64) self.data = (xs, ys)
[docs] def fit(self): """ prototype for fitting a curve """ self.logger.warning("Not implemented") pass
[docs] def get_child(self, name): """ Returns the child of the curve with name 'name' Arguments: name (str): Name of the child curve to be retrieved. If several childs have the same name, the first one is returned. Returns: CurveDB: the child curve """ for c in self.childs: if c.name == name: return c