Loading pysen/hardware/__init__.py +5 −0 Original line number Diff line number Diff line """ here go NSE hardware modules """ try: from TacoDevice import TacoDevice # NOQA except ImportError: from .mocktaco import MockTacoDevice as TacoDevice # NOQA pysen/hardware/cryomech.py +15 −12 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from builtins import bytes import struct #import device as dev from .device import TcpDevice, CommError, TimeoutError, STX, CR, BEL from .device import TcpDevice, CommError, STX, CR, BEL CMD_PROD_ID = 0x30 # product id (returned as decimal str) CMD_VERSION = 0x40 # software version string Loading Loading @@ -65,16 +65,19 @@ ESCAPE_LIST = [(STX, BEL+b'0'), (CR, BEL+b'1'), (BEL, BEL+b'2')] # Celsius to Fahrenheit C2F = lambda c : c*1.8+32.0 def c2f(c): """Celsius to Fahrenheit""" return c*1.8+32.0 def hexl(msg): """my way hexlify """ return " ".join([ ("0x%02x" % _c) for _c in bytes(msg) ]) #return " ".join([ ("0x%02x" % _c) for _c in bytes(msg) ]) return " ".join([ f"0x{_c:02x}" for _c in bytes(msg) ]) def mod256(msg): "SMDP checksum" cksum = sum([_c for _c in bytes(msg)]) % 0x100 #cksum = sum([_c for _c in bytes(msg)]) % 0x100 cksum = sum(bytes(msg)) % 0x100 cksum1 = (cksum & 0xF0) >> 4 cksum2 = cksum & 0x0F return cksum1+0x30, cksum2+0x30 Loading Loading @@ -119,21 +122,22 @@ class CryomechCP2800(TcpDevice): Cryomech communication object """ _saddr = 0x10 _c = 'c'.encode('ascii') # override generic read def _read(self, _maxcr=256): "Read binary data from STX to CR" while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("Cryomech %s timeout" % self._name) if not byte: #not len(byte): raise TimeoutError(f"Cryomech {self._name} timeout") if byte == STX: break reply = byte while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("Cryomech %s timeout" % self._name) if not byte: #not len(byte): raise TimeoutError(f"Cryomech {self._name} timeout") reply += byte if byte == CR: break Loading @@ -155,7 +159,7 @@ class CryomechCP2800(TcpDevice): def _read_data(self, opcode, index=0): "read data" value = 0 data = struct.pack('>cHB', 'c', opcode, index) data = struct.pack('>cHB', self._c, opcode, index) _cmd, response = self._command(CMD_CRYO, data) _dum, _opcode, _index, value = struct.unpack('>cHBl', response) return value Loading @@ -170,7 +174,7 @@ class CryomechCP2800(TcpDevice): "get version" _cmd, version = self._command(CMD_VERSION) _cmd, prod_id = self._command(CMD_PROD_ID) return "%s,%s" % (version, prod_id) return f"{version},{prod_id}" def info(self): "info about the device" Loading Loading @@ -215,4 +219,3 @@ class CryomechCP2800(TcpDevice): def high_pressure(self): "get high pressure" return self._read_data(PRES_TENTH_PSI, HIGH_PRESSURE)*PRES_SCALE pysen/hardware/device.py +10 −19 Original line number Diff line number Diff line Loading @@ -11,11 +11,6 @@ Author: Piotr Zolnierczuk, JCNS import time import socket try: from TacoDevice import TacoDevice # NOQA except ImportError: from .mocktaco import MockTacoDevice as TacoDevice # NOQA # ASCII stuff - could/should be taken from somewhere else STX = b'\x02' # start of text ETX = b'\x03' # end of text Loading @@ -28,15 +23,14 @@ NAK = b'\x15' # negative acknowledgement ESC = b'\x1B' # escape class TimeoutError(RuntimeError): "timeout error" pass #class TimeoutError(RuntimeError): # "timeout error" # pass class CommError(RuntimeError): "communication error" pass class TcpDevice(object): class TcpDevice: """ generic Serial over TCP device """ Loading @@ -59,7 +53,6 @@ class TcpDevice(object): dev = cls(name) if dev.connect(host, port): return dev else: return None def _recv(self, _maxcr=256): Loading @@ -67,9 +60,8 @@ class TcpDevice(object): reply = '' while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("%s(%s) timeout" % (self.__class__, self.name)) if not byte: raise TimeoutError(f"{self.__class__}({self.name}) timeout") reply += byte if byte in (CR,): break Loading Loading @@ -99,7 +91,6 @@ class TcpDevice(object): def init(self, *args): "send init commands to device" pass def close(self): "close connection" Loading @@ -114,6 +105,6 @@ class TcpDevice(object): self._send(cmd) time.sleep(self._delay) reply = self._recv() if not len(reply): raise TimeoutError("Command '%s' failed: Timeout" % cmd) if not reply: raise TimeoutError("Command '{cmd}' failed: Timeout") return reply.strip() pysen/hardware/keithley.py +2 −2 Original line number Diff line number Diff line Loading @@ -17,9 +17,9 @@ class KeithleyMeter(TcpDevice): """ Keithley Meter class""" def init(self, *args): self.send("*RST") self._send("*RST") for arg in args: self.send(arg) self._send(arg) def version(self): "get version" Loading pysen/hardware/lakeshore_sc.py +8 −7 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ Author: Piotr Zolnierczuk, JCNS """ import time from io import StringIO from .device import TacoDevice from . import TacoDevice def run(filename, *devices): "read lakeshore temperatures" Loading @@ -23,18 +23,19 @@ def run(filename, *devices): try: while True: buff = StringIO() buff.write("Time %s \n" % time.ctime()) buff.write(f"Time {time.ctime()} \n") buff.write("Temperature controllers:\n") for i, tcc in enumerate(dev_list): resp = tcc.DevRead() buff.write('T SC%d Sensor\t' % i) buff.write(f"T SC{i} Sensor\t") for k in [0,]+list(range(2, 9)): if k: buff.write('\t\t') buff.write('%d\t%s\n' % (k+1, resp[k])) #buff.write('%d\t%s\n' % (k+1, resp[k])) buff.write(f"{k+1}\t{resp[k]}\n") out = buff.getvalue() print( out ) _fd = open(filename, 'w') with open(filename, 'w', encoding='ascii') as _fd: _fd.write(out) _fd.close() time.sleep(60) Loading Loading
pysen/hardware/__init__.py +5 −0 Original line number Diff line number Diff line """ here go NSE hardware modules """ try: from TacoDevice import TacoDevice # NOQA except ImportError: from .mocktaco import MockTacoDevice as TacoDevice # NOQA
pysen/hardware/cryomech.py +15 −12 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from builtins import bytes import struct #import device as dev from .device import TcpDevice, CommError, TimeoutError, STX, CR, BEL from .device import TcpDevice, CommError, STX, CR, BEL CMD_PROD_ID = 0x30 # product id (returned as decimal str) CMD_VERSION = 0x40 # software version string Loading Loading @@ -65,16 +65,19 @@ ESCAPE_LIST = [(STX, BEL+b'0'), (CR, BEL+b'1'), (BEL, BEL+b'2')] # Celsius to Fahrenheit C2F = lambda c : c*1.8+32.0 def c2f(c): """Celsius to Fahrenheit""" return c*1.8+32.0 def hexl(msg): """my way hexlify """ return " ".join([ ("0x%02x" % _c) for _c in bytes(msg) ]) #return " ".join([ ("0x%02x" % _c) for _c in bytes(msg) ]) return " ".join([ f"0x{_c:02x}" for _c in bytes(msg) ]) def mod256(msg): "SMDP checksum" cksum = sum([_c for _c in bytes(msg)]) % 0x100 #cksum = sum([_c for _c in bytes(msg)]) % 0x100 cksum = sum(bytes(msg)) % 0x100 cksum1 = (cksum & 0xF0) >> 4 cksum2 = cksum & 0x0F return cksum1+0x30, cksum2+0x30 Loading Loading @@ -119,21 +122,22 @@ class CryomechCP2800(TcpDevice): Cryomech communication object """ _saddr = 0x10 _c = 'c'.encode('ascii') # override generic read def _read(self, _maxcr=256): "Read binary data from STX to CR" while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("Cryomech %s timeout" % self._name) if not byte: #not len(byte): raise TimeoutError(f"Cryomech {self._name} timeout") if byte == STX: break reply = byte while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("Cryomech %s timeout" % self._name) if not byte: #not len(byte): raise TimeoutError(f"Cryomech {self._name} timeout") reply += byte if byte == CR: break Loading @@ -155,7 +159,7 @@ class CryomechCP2800(TcpDevice): def _read_data(self, opcode, index=0): "read data" value = 0 data = struct.pack('>cHB', 'c', opcode, index) data = struct.pack('>cHB', self._c, opcode, index) _cmd, response = self._command(CMD_CRYO, data) _dum, _opcode, _index, value = struct.unpack('>cHBl', response) return value Loading @@ -170,7 +174,7 @@ class CryomechCP2800(TcpDevice): "get version" _cmd, version = self._command(CMD_VERSION) _cmd, prod_id = self._command(CMD_PROD_ID) return "%s,%s" % (version, prod_id) return f"{version},{prod_id}" def info(self): "info about the device" Loading Loading @@ -215,4 +219,3 @@ class CryomechCP2800(TcpDevice): def high_pressure(self): "get high pressure" return self._read_data(PRES_TENTH_PSI, HIGH_PRESSURE)*PRES_SCALE
pysen/hardware/device.py +10 −19 Original line number Diff line number Diff line Loading @@ -11,11 +11,6 @@ Author: Piotr Zolnierczuk, JCNS import time import socket try: from TacoDevice import TacoDevice # NOQA except ImportError: from .mocktaco import MockTacoDevice as TacoDevice # NOQA # ASCII stuff - could/should be taken from somewhere else STX = b'\x02' # start of text ETX = b'\x03' # end of text Loading @@ -28,15 +23,14 @@ NAK = b'\x15' # negative acknowledgement ESC = b'\x1B' # escape class TimeoutError(RuntimeError): "timeout error" pass #class TimeoutError(RuntimeError): # "timeout error" # pass class CommError(RuntimeError): "communication error" pass class TcpDevice(object): class TcpDevice: """ generic Serial over TCP device """ Loading @@ -59,7 +53,6 @@ class TcpDevice(object): dev = cls(name) if dev.connect(host, port): return dev else: return None def _recv(self, _maxcr=256): Loading @@ -67,9 +60,8 @@ class TcpDevice(object): reply = '' while True: byte = self._sock.recv(1) if not len(byte): raise TimeoutError("%s(%s) timeout" % (self.__class__, self.name)) if not byte: raise TimeoutError(f"{self.__class__}({self.name}) timeout") reply += byte if byte in (CR,): break Loading Loading @@ -99,7 +91,6 @@ class TcpDevice(object): def init(self, *args): "send init commands to device" pass def close(self): "close connection" Loading @@ -114,6 +105,6 @@ class TcpDevice(object): self._send(cmd) time.sleep(self._delay) reply = self._recv() if not len(reply): raise TimeoutError("Command '%s' failed: Timeout" % cmd) if not reply: raise TimeoutError("Command '{cmd}' failed: Timeout") return reply.strip()
pysen/hardware/keithley.py +2 −2 Original line number Diff line number Diff line Loading @@ -17,9 +17,9 @@ class KeithleyMeter(TcpDevice): """ Keithley Meter class""" def init(self, *args): self.send("*RST") self._send("*RST") for arg in args: self.send(arg) self._send(arg) def version(self): "get version" Loading
pysen/hardware/lakeshore_sc.py +8 −7 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ Author: Piotr Zolnierczuk, JCNS """ import time from io import StringIO from .device import TacoDevice from . import TacoDevice def run(filename, *devices): "read lakeshore temperatures" Loading @@ -23,18 +23,19 @@ def run(filename, *devices): try: while True: buff = StringIO() buff.write("Time %s \n" % time.ctime()) buff.write(f"Time {time.ctime()} \n") buff.write("Temperature controllers:\n") for i, tcc in enumerate(dev_list): resp = tcc.DevRead() buff.write('T SC%d Sensor\t' % i) buff.write(f"T SC{i} Sensor\t") for k in [0,]+list(range(2, 9)): if k: buff.write('\t\t') buff.write('%d\t%s\n' % (k+1, resp[k])) #buff.write('%d\t%s\n' % (k+1, resp[k])) buff.write(f"{k+1}\t{resp[k]}\n") out = buff.getvalue() print( out ) _fd = open(filename, 'w') with open(filename, 'w', encoding='ascii') as _fd: _fd.write(out) _fd.close() time.sleep(60) Loading