Source code for pyrpl.sshshell
###############################################################################
# pyrpl - DSP servo controller for quantum optics with the RedPitaya
# Copyright (C) 2014-2016 Leonhard Neuhaus (neuhaus@spectro.jussieu.fr)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
import paramiko
from time import sleep
from scp import SCPClient
import logging
[docs]class SshShell(object):
""" This is a wrapper around paramiko.SSHClient and scp.SCPClient
I provides a ssh connection with the ability to transfer files over it"""
def __init__(
self,
hostname='localhost',
user='root',
password='root',
delay=0.05,
timeout=3,
sshport=22,
shell=True):
self._logger = logging.getLogger(name=__name__)
self.delay = delay
self.apprunning = False
self.hostname = hostname
self.sshport=sshport
self.user = user
self.password = password
self.timeout= timeout
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname,
username=self.user,
password=self.password,
port=self.sshport,
timeout=timeout)
if shell:
self.channel = self.ssh.invoke_shell()
self.startscp()
[docs] def startscp(self):
self.scp = SCPClient(self.ssh.get_transport())
[docs] def write(self, text):
if self.channel.send_ready() and not text == "":
return self.channel.send(text)
else:
return -1
[docs] def read_nbytes(self, nbytes):
if self.channel.recv_ready():
return self.channel.recv(nbytes)
else:
return b""
[docs] def read(self):
sumstring = ""
while True:
string = self.read_nbytes(1024).decode('utf-8')
sumstring += string
if not string:
break
self._logger.debug(sumstring)
return sumstring
[docs] def askraw(self, question=""):
self.write(question)
sleep(self.delay)
return self.read()
[docs] def ask(self, question=""):
return self.askraw(question + '\n')
def __del__(self):
self.endapp()
try:
self.channel.close()
except AttributeError:
pass # already broken
self.ssh.close()
[docs] def reboot(self):
self.endapp()
self.ask("shutdown -r now")
self.__del__()
[docs] def shutdown(self):
self.endapp()
self.ask("shutdown now")
self.__del__()
[docs] def get_mac_addresses(self):
"""
returns all MAC addresses of the SSH device.
"""
self.ask() # empty the shell before asking something
macs = list()
nextgood = False
for token in self.ask('ifconfig | grep HWaddr').split():
if nextgood and len(token.split(':'))==6:
macs.append(token)
if token == 'HWaddr':
nextgood = True
else:
nextgood = False
if macs == []: # problem on more recent redpitaya os
nextgood = False
for token in self.ask('ip address').split():
if nextgood and len(token.split(':'))==6:
macs.append(token)
if token == 'link/ether':
nextgood = True
else:
nextgood = False
return macs