Fork PyRPL on GitHub

Source code for pyrpl.redpitaya

#    pyrpl - DSP servo controller for quantum optics with the RedPitaya
#    Copyright (C) 2014-2016  Leonhard Neuhaus  (
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    GNU General Public License for more details.
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <>.

from . import redpitaya_client
from . import hardware_modules as rp
from .sshshell import SshShell
from .pyrpl_utils import get_unique_name_list_from_class_list, update_with_typeconversion
from .memory import MemoryTree
from .errors import ExpectedPyrplError
from .widgets.startup_widget import HostnameSelectorWidget

import logging
import os
import random
import socket
from time import sleep
import numpy as np

from paramiko import SSHException
from scp import SCPClient, SCPException
from collections import OrderedDict

# input is the wrong function in python 2
except NameError:  # Python 3
    raw_input = input

# default parameters for redpitaya object creation
defaultparameters = dict(
    hostname='', #'', # the ip or hostname of the board, '' triggers gui
    port=2222,  # port for PyRPL datacommunication
    sshport=22,  # port of ssh server - default 22
    delay=0.05,  # delay between ssh commands - console is too slow otherwise
    autostart=True,  # autostart the client?
    reloadserver=False,  # reinstall the server at startup if not necessary?
    reloadfpga=True,  # reload the fpga bitfile at startup?
    serverbinfilename='fpga.bin',  # name of the binfile on the server
    serverdirname = "//opt//pyrpl//",  # server directory for server app and bitfile
    leds_off=True,  # turn off all GPIO lets at startup (improves analog performance)
    frequency_correction=1.0,  # actual FPGA frequency is 125 MHz * frequency_correction
    timeout=1,  # timeout in seconds for ssh communication
    monitor_server_name='monitor_server',  # name of the server program on redpitaya
    silence_env=False,   # suppress all environment variables that may override the configuration?
    gui=True  # show graphical user interface or work on command-line only?

[docs]class RedPitaya(object): cls_modules = [rp.HK, rp.AMS, rp.Scope, rp.Sampler, rp.Asg0, rp.Asg1] + \ [rp.Pwm] * 2 + [rp.Iq] * 3 + [rp.Pid] * 3 + [rp.Trig] + [ rp.IIR] def __init__(self, config=None, # configfile is needed to store parameters. None simulates one **kwargs): """ this class provides the basic interface to the redpitaya board The constructor installs and starts the communication interface on the RedPitaya at 'hostname' that allows remote control and readout 'config' is the config file or MemoryTree of the config file. All keyword arguments may be specified in the branch 'redpitaya' of this config file. Alternatively, they can be overwritten by keyword arguments at the function call. 'config=None' specifies that no persistent config file is saved on the disc. Possible keyword arguments and their defaults are: hostname='', # the ip or hostname of the board port=2222, # port for PyRPL datacommunication sshport=22, # port of ssh server - default 22 user='root', password='root', delay=0.05, # delay between ssh commands - console is too slow otherwise autostart=True, # autostart the client? reloadserver=False, # reinstall the server at startup if not necessary? reloadfpga=True, # reload the fpga bitfile at startup? filename='fpga//red_pitaya.bin', # name of the bitfile for the fpga, None is default file serverbinfilename='fpga.bin', # name of the binfile on the server serverdirname = "//opt//pyrpl//", # server directory for server app and bitfile leds_off=True, # turn off all GPIO lets at startup (improves analog performance) frequency_correction=1.0, # actual FPGA frequency is 125 MHz * frequency_correction timeout=3, # timeout in seconds for ssh communication monitor_server_name='monitor_server', # name of the server program on redpitaya silence_env=False, # suppress all environment variables that may override the configuration? gui=True # show graphical user interface or work on command-line only? if you are experiencing problems, try to increase delay, or try logging.getLogger().setLevel(logging.DEBUG)""" self.logger = logging.getLogger(name=__name__) #self.license() # make or retrieve the config file if isinstance(config, MemoryTree): self.c = config else: self.c = MemoryTree(config) # get the parameters right (in order of increasing priority): # 1. defaults # 2. environment variables # 3. config file # 4. command line arguments # 5. (if missing information) request from GUI or command-line self.parameters = defaultparameters # get parameters from os.environment variables if not self.parameters['silence_env']: for k in self.parameters.keys(): if "REDPITAYA_"+k.upper() in os.environ: newvalue = os.environ["REDPITAYA_"+k.upper()] oldvalue = self.parameters[k] self.parameters[k] = type(oldvalue)(newvalue) if k == "password": # do not show the password on the screen oldvalue = "********" newvalue = "********" self.logger.debug("Variable %s with value %s overwritten " "by environment variable REDPITAYA_%s " "with value %s. Use argument " "'silence_env=True' if this is not " "desired!", k, oldvalue, k.upper(), newvalue) # settings from config file try: update_with_typeconversion(self.parameters, self.c._get_or_create('redpitaya')._data) except BaseException as e: self.logger.warning("An error occured during the loading of your " "Red Pitaya settings from the config file: %s", e) # settings from class initialisation / command line update_with_typeconversion(self.parameters, kwargs) # get missing connection settings from gui/command line if self.parameters['hostname'] is None or self.parameters['hostname']=='': gui = 'gui' not in self.c._keys() or self.c.gui if gui:"Please choose the hostname of " "your Red Pitaya in the hostname " "selector window!") startup_widget = HostnameSelectorWidget(config=self.parameters) hostname_kwds = startup_widget.get_kwds() else: hostname = raw_input('Enter hostname []: ') hostname = '' if hostname == '' else hostname hostname_kwds = dict(hostname=hostname) if not "sshport" in kwargs: sshport = raw_input('Enter sshport [22]: ') sshport = 22 if sshport == '' else int(sshport) hostname_kwds['sshport'] = sshport if not 'user' in kwargs: user = raw_input('Enter username [root]: ') user = 'root' if user == '' else user hostname_kwds['user'] = user if not 'password' in kwargs: password = raw_input('Enter password [root]: ') password = 'root' if password == '' else password hostname_kwds['password'] = password self.parameters.update(hostname_kwds) # optional: write configuration back to config file self.c["redpitaya"] = self.parameters # save default port definition for possible automatic port change self.parameters['defaultport'] = self.parameters['port'] # frequency_correction is accessed by child modules self.frequency_correction = self.parameters['frequency_correction'] # memorize whether server is running - nearly obsolete self._serverrunning = False self.client = None # client class self._slaves = [] # slave interfaces to same redpitaya self.modules = OrderedDict() # all submodules # provide option to simulate a RedPitaya if self.parameters['hostname'] in ['_FAKE_REDPITAYA_', '_FAKE_']: self.startdummyclient() self.logger.warning("Simulating RedPitaya because (hostname==" +self.parameters["hostname"]+"). Incomplete " "functionality possible. ") return elif self.parameters['hostname'] in ['_NONE_']: self.modules = [] self.logger.warning("No RedPitaya created (hostname==" + self.parameters["hostname"] + ")." " No hardware modules are available. ") return # connect to the redpitaya board self.start_ssh() # start other stuff if self.parameters['reloadfpga']: # flash fpga self.update_fpga() if self.parameters['reloadserver']: # reinstall server app self.installserver() if self.parameters['autostart']: # start client self.start()'Successfully connected to Redpitaya with hostname ' '%s.'%self.ssh.hostname) self.parent = self
[docs] def start_ssh(self, attempt=0): """ Extablishes an ssh connection to the RedPitaya board returns True if a successful connection has been established """ try: # close pre-existing connection if necessary self.end_ssh() except: pass if self.parameters['hostname'] == "_FAKE_REDPITAYA_": # simulation mode - start without connecting self.logger.warning("(Re-)starting client in dummy mode...") self.startdummyclient() return True else: # normal mode - establish ssh connection and try: # start ssh connection self.ssh = SshShell(hostname=self.parameters['hostname'], sshport=self.parameters['sshport'], user=self.parameters['user'], password=self.parameters['password'], delay=self.parameters['delay'], timeout=self.parameters['timeout']) # test ssh connection for exceptions self.ssh.ask() except BaseException as e: # connection problem if attempt < 3: # try to connect up to 3 times return self.start_ssh(attempt=attempt+1) else: # even multiple attempts did not work raise ExpectedPyrplError( "\nCould not connect to the Red Pitaya device with " "the following parameters: \n\n" "\thostname: %s\n" "\tssh port: %s\n" "\tusername: %s\n" "\tpassword: ****\n\n" "Please confirm that the device is reachable by typing " "its hostname/ip address into a web browser and " "checking that a page is displayed. \n\n" "Error message: %s" % (self.parameters["hostname"], self.parameters["sshport"], self.parameters["user"], e)) else: # everything went well, connection is established # also establish scp connection self.ssh.startscp() return True
[docs] def switch_led(self, gpiopin=0, state=False): self.ssh.ask("echo " + str(gpiopin) + " > /sys/class/gpio/export") sleep(self.parameters['delay']) self.ssh.ask( "echo out > /sys/class/gpio/gpio" + str(gpiopin) + "/direction") sleep(self.parameters['delay']) if state: state = "1" else: state = "0" self.ssh.ask("echo " + state + " > /sys/class/gpio/gpio" + str(gpiopin) + "/value") sleep(self.parameters['delay'])
[docs] def update_fpga(self, filename=None): if filename is None: try: source = self.parameters['filename'] except KeyError: source = None self.end() sleep(self.parameters['delay']) self.ssh.ask('rw') sleep(self.parameters['delay']) self.ssh.ask('mkdir ' + self.parameters['serverdirname']) sleep(self.parameters['delay']) if source is None or not os.path.isfile(source): if source is not None: self.logger.warning('Desired bitfile "%s" does not exist. Using default file.', source) source = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'fpga', 'red_pitaya.bin') if not os.path.isfile(source): raise IOError("Wrong filename", "The fpga bitfile was not found at the expected location. Try passing the arguments " "dirname=\"c://github//pyrpl//pyrpl//\" adapted to your installation directory of pyrpl " "and filename=\"red_pitaya.bin\"! Current dirname: " + self.parameters['dirname'] + " current filename: "+self.parameters['filename']) for i in range(3): try: self.ssh.scp.put(source, os.path.join(self.parameters['serverdirname'], self.parameters['serverbinfilename'])) except (SCPException, SSHException): # try again before failing self.start_ssh() sleep(self.parameters['delay']) else: break # kill all other servers to prevent reading while fpga is flashed self.end() self.ssh.ask('killall nginx') self.ssh.ask('systemctl stop redpitaya_nginx') # for 0.94 and higher self.ssh.ask('cat ' + os.path.join(self.parameters['serverdirname'], self.parameters['serverbinfilename']) + ' > //dev//xdevcfg') sleep(self.parameters['delay']) self.ssh.ask('rm -f '+ os.path.join(self.parameters['serverdirname'], self.parameters['serverbinfilename'])) self.ssh.ask("nginx -p //opt//www//") self.ssh.ask('systemctl start redpitaya_nginx') # for 0.94 and higher #needs test sleep(self.parameters['delay']) self.ssh.ask('ro')
[docs] def fpgarecentlyflashed(self): self.ssh.ask() result =self.ssh.ask("echo $(($(date +%s) - $(date +%s -r \"" + os.path.join(self.parameters['serverdirname'], self.parameters['serverbinfilename']) +"\")))") age = None for line in result.split('\n'): try: age = int(line.strip()) except: pass else: break if not age: self.logger.debug("Could not retrieve bitfile age from: %s", result) return False elif age > 10: self.logger.debug("Found expired bitfile. Age: %s", age) return False else: self.logger.debug("Found recent bitfile. Age: %s", age) return True
[docs] def installserver(self): self.endserver() sleep(self.parameters['delay']) self.ssh.ask('rw') sleep(self.parameters['delay']) self.ssh.ask('mkdir ' + self.parameters['serverdirname']) sleep(self.parameters['delay']) self.ssh.ask("cd " + self.parameters['serverdirname']) #try both versions for serverfile in ['monitor_server','monitor_server_0.95']: sleep(self.parameters['delay']) try: self.ssh.scp.put( os.path.join(os.path.abspath(os.path.dirname(__file__)), 'monitor_server', serverfile), self.parameters['serverdirname'] + self.parameters['monitor_server_name']) except (SCPException, SSHException): self.logger.exception("Upload error. Try again after rebooting your RedPitaya..") sleep(self.parameters['delay']) self.ssh.ask('chmod 755 ./'+self.parameters['monitor_server_name']) sleep(self.parameters['delay']) self.ssh.ask('ro') result = self.ssh.ask("./"+self.parameters['monitor_server_name']+" "+ str(self.parameters['port'])) sleep(self.parameters['delay']) result += self.ssh.ask() if not "sh" in result: self.logger.debug("Server application started on port %d", self.parameters['port']) return self.parameters['port'] else: # means we tried the wrong binary version. make sure server is not running and try again with next file self.endserver() #try once more on a different port if self.parameters['port'] == self.parameters['defaultport']: self.parameters['port'] = random.randint(self.parameters['defaultport'],50000) self.logger.warning("Problems to start the server application. Trying again with a different port number %d",self.parameters['port']) return self.installserver() self.logger.error("Server application could not be started. Try to recompile monitor_server on your RedPitaya (see manual). ") return None
[docs] def startserver(self): self.endserver() sleep(self.parameters['delay']) if self.fpgarecentlyflashed():"FPGA is being flashed. Please wait for 2 " "seconds.") sleep(2.0) result = self.ssh.ask(self.parameters['serverdirname']+"/"+self.parameters['monitor_server_name'] +" "+ str(self.parameters['port'])) if not "sh" in result: # sh in result means we tried the wrong binary version self.logger.debug("Server application started on port %d", self.parameters['port']) self._serverrunning = True return self.parameters['port'] #something went wrong return self.installserver()
[docs] def endserver(self): try: self.ssh.ask('\x03') #exit running server application except: self.logger.exception("Server not responding...") if 'pitaya' in self.ssh.ask(): self.logger.debug('>') # formerly 'console ready' sleep(self.parameters['delay']) # make sure no other monitor_server blocks the port self.ssh.ask('killall ' + self.parameters['monitor_server_name']) self._serverrunning = False
[docs] def endclient(self): del self.client self.client = None
[docs] def start(self): if self.parameters['leds_off']: self.switch_led(gpiopin=0, state=False) self.switch_led(gpiopin=7, state=False) self.startserver() sleep(self.parameters['delay']) self.startclient()
[docs] def end(self): self.endserver() self.endclient()
[docs] def end_ssh(self):
[docs] def end_all(self): self.end() self.end_ssh()
[docs] def restart(self): self.end() self.start()
[docs] def restartserver(self, port=None): """restart the server. usually executed when client encounters an error""" if port is not None: if port < 0: #code to try a random port self.parameters['port'] = random.randint(2223,50000) else: self.parameters['port'] = port return self.startserver()
[docs] def license(self):"""\r\n pyrpl Copyright (C) 2014-2017 Leonhard Neuhaus This program comes with ABSOLUTELY NO WARRANTY; for details read the file "LICENSE" in the source directory. This is free software, and you are welcome to redistribute it under certain conditions; read the file "LICENSE" in the source directory for details.\r\n""")
[docs] def startclient(self): self.client = redpitaya_client.MonitorClient( self.parameters['hostname'], self.parameters['port'], restartserver=self.restartserver) self.makemodules() self.logger.debug("Client started successfully. ")
[docs] def startdummyclient(self): self.client = redpitaya_client.DummyClient() self.makemodules()
[docs] def makemodule(self, name, cls): module = cls(self, name) setattr(self, name, module) self.modules[name] = module
[docs] def makemodules(self): """ Automatically generates modules from the list RedPitaya.cls_modules """ names = get_unique_name_list_from_class_list(self.cls_modules) for cls, name in zip(self.cls_modules, names): self.makemodule(name, cls)
[docs] def make_a_slave(self, port=None, monitor_server_name=None, gui=False): if port is None: port = self.parameters['port'] + len(self._slaves)*10 + 1 if monitor_server_name is None: monitor_server_name = self.parameters['monitor_server_name'] + str(port) slaveparameters = dict(self.parameters) slaveparameters.update(dict( port=port, autostart=True, reloadfpga=False, reloadserver=False, monitor_server_name=monitor_server_name, silence_env=True)) r = RedPitaya(**slaveparameters) #gui=gui) r._master = self self._slaves.append(r) return r