diff --git a/pyren/serial/__init__.py b/pyren/serial/__init__.py index 4cd3a25..caa4de1 100755 --- a/pyren/serial/__init__.py +++ b/pyren/serial/__init__.py @@ -3,17 +3,19 @@ # This is a wrapper module for different platform implementations # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2001-2016 Chris Liechti +# (C) 2001-2020 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import sys import importlib from serial.serialutil import * #~ SerialBase, SerialException, to_bytes, iterbytes -__version__ = '3.2.1' +__version__ = '3.5' VERSION = __version__ diff --git a/pyren/serial/rfc2217.py b/pyren/serial/rfc2217.py index dee5c2b..2ae188e 100755 --- a/pyren/serial/rfc2217.py +++ b/pyren/serial/rfc2217.py @@ -58,6 +58,8 @@ # RFC). # the order of the options is not relevant +from __future__ import absolute_import + import logging import socket import struct @@ -74,7 +76,7 @@ except ImportError: import serial from serial.serialutil import SerialBase, SerialException, to_bytes, \ - iterbytes, portNotOpenError, Timeout + iterbytes, PortNotOpenError, Timeout # port string is expected to be something like this: # rfc2217://host:port @@ -380,7 +382,6 @@ class Serial(SerialBase): 9600, 19200, 38400, 57600, 115200) def __init__(self, *args, **kwargs): - super(Serial, self).__init__(*args, **kwargs) self._thread = None self._socket = None self._linestate = 0 @@ -396,6 +397,7 @@ class Serial(SerialBase): self._rfc2217_port_settings = None self._rfc2217_options = None self._read_buffer = None + super(Serial, self).__init__(*args, **kwargs) # must be last call in case of auto-open def open(self): """\ @@ -481,7 +483,7 @@ class Serial(SerialBase): if self.logger: self.logger.info("Negotiated options: {}".format(self._telnet_options)) - # fine, go on, set RFC 2271 specific things + # fine, go on, set RFC 2217 specific things self._reconfigure_port() # all things set up get, now a clean start if not self._dsrdtr: @@ -596,7 +598,7 @@ class Serial(SerialBase): def in_waiting(self): """Return the number of bytes currently in the input buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self._read_buffer.qsize() def read(self, size=1): @@ -606,13 +608,19 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() data = bytearray() try: + timeout = Timeout(self._timeout) while len(data) < size: - if self._thread is None: + if self._thread is None or not self._thread.is_alive(): raise SerialException('connection failed (reader thread died)') - data += self._read_buffer.get(True, self._timeout) + buf = self._read_buffer.get(True, timeout.time_left()) + if buf is None: + return bytes(data) + data += buf + if timeout.expired(): + break except Queue.Empty: # -> timeout pass return bytes(data) @@ -624,7 +632,7 @@ class Serial(SerialBase): closed. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() with self._write_lock: try: self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED)) @@ -635,7 +643,7 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self.rfc2217_send_purge(PURGE_RECEIVE_BUFFER) # empty read buffer while self._read_buffer.qsize(): @@ -647,7 +655,7 @@ class Serial(SerialBase): discarding all that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self.rfc2217_send_purge(PURGE_TRANSMIT_BUFFER) def _update_break_state(self): @@ -656,7 +664,7 @@ class Serial(SerialBase): possible. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('set BREAK to {}'.format('active' if self._break_state else 'inactive')) if self._break_state: @@ -667,7 +675,7 @@ class Serial(SerialBase): def _update_rts_state(self): """Set terminal status line: Request To Send.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('set RTS to {}'.format('active' if self._rts_state else 'inactive')) if self._rts_state: @@ -678,7 +686,7 @@ class Serial(SerialBase): def _update_dtr_state(self): """Set terminal status line: Data Terminal Ready.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('set DTR to {}'.format('active' if self._dtr_state else 'inactive')) if self._dtr_state: @@ -690,28 +698,28 @@ class Serial(SerialBase): def cts(self): """Read terminal status line: Clear To Send.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return bool(self.get_modem_state() & MODEMSTATE_MASK_CTS) @property def dsr(self): """Read terminal status line: Data Set Ready.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return bool(self.get_modem_state() & MODEMSTATE_MASK_DSR) @property def ri(self): """Read terminal status line: Ring Indicator.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return bool(self.get_modem_state() & MODEMSTATE_MASK_RI) @property def cd(self): """Read terminal status line: Carrier Detect.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return bool(self.get_modem_state() & MODEMSTATE_MASK_CD) # - - - platform specific - - - @@ -735,8 +743,10 @@ class Serial(SerialBase): # connection fails -> terminate loop if self.logger: self.logger.debug("socket error in reader thread: {}".format(e)) + self._read_buffer.put(None) break if not data: + self._read_buffer.put(None) break # lost connection for byte in iterbytes(data): if mode == M_NORMAL: @@ -780,7 +790,6 @@ class Serial(SerialBase): self._telnet_negotiate_option(telnet_command, byte) mode = M_NORMAL finally: - self._thread = None if self.logger: self.logger.debug("read thread terminated") @@ -850,12 +859,12 @@ class Serial(SerialBase): def telnet_send_option(self, action, option): """Send DO, DONT, WILL, WONT.""" - self._internal_raw_write(to_bytes([IAC, action, option])) + self._internal_raw_write(IAC + action + option) def rfc2217_send_subnegotiation(self, option, value=b''): """Subnegotiation of RFC2217 parameters.""" value = value.replace(IAC, IAC_DOUBLED) - self._internal_raw_write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE])) + self._internal_raw_write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE) def rfc2217_send_purge(self, value): """\ @@ -890,7 +899,7 @@ class Serial(SerialBase): """\ get last modem state (cached value. If value is "old", request a new one. This cache helps that we don't issue to many requests when e.g. all - status lines, one after the other is queried by the user (getCTS, getDSR + status lines, one after the other is queried by the user (CTS, DSR etc.) """ # active modem state polling enabled? is the value fresh enough? @@ -989,12 +998,12 @@ class PortManager(object): def telnet_send_option(self, action, option): """Send DO, DONT, WILL, WONT.""" - self.connection.write(to_bytes([IAC, action, option])) + self.connection.write(IAC + action + option) def rfc2217_send_subnegotiation(self, option, value=b''): """Subnegotiation of RFC 2217 parameters.""" value = value.replace(IAC, IAC_DOUBLED) - self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE])) + self.connection.write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE) # - check modem lines, needs to be called periodically from user to # establish polling @@ -1005,10 +1014,10 @@ class PortManager(object): send updates on changes. """ modemstate = ( - (self.serial.getCTS() and MODEMSTATE_MASK_CTS) | - (self.serial.getDSR() and MODEMSTATE_MASK_DSR) | - (self.serial.getRI() and MODEMSTATE_MASK_RI) | - (self.serial.getCD() and MODEMSTATE_MASK_CD)) + (self.serial.cts and MODEMSTATE_MASK_CTS) | + (self.serial.dsr and MODEMSTATE_MASK_DSR) | + (self.serial.ri and MODEMSTATE_MASK_RI) | + (self.serial.cd and MODEMSTATE_MASK_CD)) # check what has changed deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0 if deltas & MODEMSTATE_MASK_CTS: @@ -1230,12 +1239,12 @@ class PortManager(object): self.logger.warning("requested break state - not implemented") pass # XXX needs cached value elif suboption[2:3] == SET_CONTROL_BREAK_ON: - self.serial.setBreak(True) + self.serial.break_condition = True if self.logger: self.logger.info("changed BREAK to active") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_BREAK_ON) elif suboption[2:3] == SET_CONTROL_BREAK_OFF: - self.serial.setBreak(False) + self.serial.break_condition = False if self.logger: self.logger.info("changed BREAK to inactive") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_BREAK_OFF) @@ -1244,12 +1253,12 @@ class PortManager(object): self.logger.warning("requested DTR state - not implemented") pass # XXX needs cached value elif suboption[2:3] == SET_CONTROL_DTR_ON: - self.serial.setDTR(True) + self.serial.dtr = True if self.logger: self.logger.info("changed DTR to active") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_DTR_ON) elif suboption[2:3] == SET_CONTROL_DTR_OFF: - self.serial.setDTR(False) + self.serial.dtr = False if self.logger: self.logger.info("changed DTR to inactive") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_DTR_OFF) @@ -1259,12 +1268,12 @@ class PortManager(object): pass # XXX needs cached value #~ self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON) elif suboption[2:3] == SET_CONTROL_RTS_ON: - self.serial.setRTS(True) + self.serial.rts = True if self.logger: self.logger.info("changed RTS to active") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON) elif suboption[2:3] == SET_CONTROL_RTS_OFF: - self.serial.setRTS(False) + self.serial.rts = False if self.logger: self.logger.info("changed RTS to inactive") self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_OFF) diff --git a/pyren/serial/rs485.py b/pyren/serial/rs485.py index 2939350..d7aff6f 100755 --- a/pyren/serial/rs485.py +++ b/pyren/serial/rs485.py @@ -13,6 +13,8 @@ serial ports (where supported). NOTE: Some implementations may only support a subset of the settings. """ +from __future__ import absolute_import + import time import serial diff --git a/pyren/serial/serialcli.py b/pyren/serial/serialcli.py index 0727a52..4614736 100755 --- a/pyren/serial/serialcli.py +++ b/pyren/serial/serialcli.py @@ -7,6 +7,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import System import System.IO.Ports from serial.serialutil import * @@ -146,7 +148,7 @@ class Serial(SerialBase): def in_waiting(self): """Return the number of characters currently in the input buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self._port_handle.BytesToRead def read(self, size=1): @@ -156,7 +158,7 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() # must use single byte reads as this is the only way to read # without applying encodings data = bytearray() @@ -172,7 +174,7 @@ class Serial(SerialBase): def write(self, data): """Output the given string over the serial port.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() #~ if not isinstance(data, (bytes, bytearray)): #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) try: @@ -180,13 +182,13 @@ class Serial(SerialBase): # as this is the only one not applying encodings self._port_handle.Write(as_byte_array(data), 0, len(data)) except System.TimeoutException: - raise writeTimeoutError + raise SerialTimeoutException('Write timeout') return len(data) def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self._port_handle.DiscardInBuffer() def reset_output_buffer(self): @@ -195,7 +197,7 @@ class Serial(SerialBase): discarding all that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self._port_handle.DiscardOutBuffer() def _update_break_state(self): @@ -203,40 +205,40 @@ class Serial(SerialBase): Set break: Controls TXD. When active, to transmitting is possible. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self._port_handle.BreakState = bool(self._break_state) def _update_rts_state(self): """Set terminal status line: Request To Send""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self._port_handle.RtsEnable = bool(self._rts_state) def _update_dtr_state(self): """Set terminal status line: Data Terminal Ready""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self._port_handle.DtrEnable = bool(self._dtr_state) @property def cts(self): """Read terminal status line: Clear To Send""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self._port_handle.CtsHolding @property def dsr(self): """Read terminal status line: Data Set Ready""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self._port_handle.DsrHolding @property def ri(self): """Read terminal status line: Ring Indicator""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() #~ return self._port_handle.XXX return False # XXX an error would be better @@ -244,7 +246,7 @@ class Serial(SerialBase): def cd(self): """Read terminal status line: Carrier Detect""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self._port_handle.CDHolding # - - platform specific - - - - diff --git a/pyren/serial/serialjava.py b/pyren/serial/serialjava.py index 7bd5b3e..0789a78 100755 --- a/pyren/serial/serialjava.py +++ b/pyren/serial/serialjava.py @@ -7,6 +7,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + from serial.serialutil import * @@ -150,7 +152,7 @@ class Serial(SerialBase): def in_waiting(self): """Return the number of characters currently in the input buffer.""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() return self._instream.available() def read(self, size=1): @@ -160,7 +162,7 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() read = bytearray() if size > 0: while len(read) < size: @@ -175,7 +177,7 @@ class Serial(SerialBase): def write(self, data): """Output the given string over the serial port.""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() if not isinstance(data, (bytes, bytearray)): raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) self._outstream.write(data) @@ -184,7 +186,7 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self._instream.skip(self._instream.available()) def reset_output_buffer(self): @@ -193,57 +195,57 @@ class Serial(SerialBase): discarding all that is in the buffer. """ if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self._outstream.flush() def send_break(self, duration=0.25): """Send break condition. Timed, returns to idle state after given duration.""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.sendBreak(duration*1000.0) def _update_break_state(self): """Set break: Controls TXD. When active, to transmitting is possible.""" if self.fd is None: - raise portNotOpenError + raise PortNotOpenError() raise SerialException("The _update_break_state function is not implemented in java.") def _update_rts_state(self): """Set terminal status line: Request To Send""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.setRTS(self._rts_state) def _update_dtr_state(self): """Set terminal status line: Data Terminal Ready""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.setDTR(self._dtr_state) @property def cts(self): """Read terminal status line: Clear To Send""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.isCTS() @property def dsr(self): """Read terminal status line: Data Set Ready""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.isDSR() @property def ri(self): """Read terminal status line: Ring Indicator""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.isRI() @property def cd(self): """Read terminal status line: Carrier Detect""" if not self.sPort: - raise portNotOpenError + raise PortNotOpenError() self.sPort.isCD() diff --git a/pyren/serial/serialposix.py b/pyren/serial/serialposix.py index b7e2b7d..7aceb76 100755 --- a/pyren/serial/serialposix.py +++ b/pyren/serial/serialposix.py @@ -3,7 +3,7 @@ # backend for serial IO for POSIX compatible systems, like Linux, OSX # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2001-2016 Chris Liechti +# (C) 2001-2020 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause # @@ -26,6 +26,8 @@ # - aix (AIX) /dev/tty%d +from __future__ import absolute_import + # pylint: disable=abstract-method import errno import fcntl @@ -37,7 +39,7 @@ import termios import serial from serial.serialutil import SerialBase, SerialException, to_bytes, \ - portNotOpenError, writeTimeoutError, Timeout + PortNotOpenError, SerialTimeoutException, Timeout class PlatformSpecificBase(object): @@ -49,6 +51,18 @@ class PlatformSpecificBase(object): def _set_rs485_mode(self, rs485_settings): raise NotImplementedError('RS485 not supported on this platform') + def set_low_latency_mode(self, low_latency_settings): + raise NotImplementedError('Low latency not supported on this platform') + + def _update_break_state(self): + """\ + Set break: Controls TXD. When active, no transmitting is possible. + """ + if self._break_state: + fcntl.ioctl(self.fd, TIOCSBRK) + else: + fcntl.ioctl(self.fd, TIOCCBRK) + # some systems support an extra flag to enable the two in POSIX unsupported # paritiy settings for MARK and SPACE @@ -113,6 +127,24 @@ if plat[:5] == 'linux': # Linux (confirmed) # noqa 4000000: 0o010017 } + def set_low_latency_mode(self, low_latency_settings): + buf = array.array('i', [0] * 32) + + try: + # get serial_struct + fcntl.ioctl(self.fd, termios.TIOCGSERIAL, buf) + + # set or unset ASYNC_LOW_LATENCY flag + if low_latency_settings: + buf[4] |= 0x2000 + else: + buf[4] &= ~0x2000 + + # set serial_struct + fcntl.ioctl(self.fd, termios.TIOCSSERIAL, buf) + except IOError as e: + raise ValueError('Failed to update ASYNC_LOW_LATENCY flag to {}: {}'.format(low_latency_settings, e)) + def _set_special_baudrate(self, baudrate): # right size is 44 on x86_64, allow for some growth buf = array.array('i', [0] * 64) @@ -182,6 +214,9 @@ elif plat[:6] == 'darwin': # OS X class PlatformSpecific(PlatformSpecificBase): osx_version = os.uname()[2].split('.') + TIOCSBRK = 0x2000747B # _IO('t', 123) + TIOCCBRK = 0x2000747A # _IO('t', 122) + # Tiger or above can support arbitrary serial speeds if int(osx_version[0]) >= 8: def _set_special_baudrate(self, baudrate): @@ -189,6 +224,15 @@ elif plat[:6] == 'darwin': # OS X buf = array.array('i', [baudrate]) fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1) + def _update_break_state(self): + """\ + Set break: Controls TXD. When active, no transmitting is possible. + """ + if self._break_state: + fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK) + else: + fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK) + elif plat[:3] == 'bsd' or \ plat[:7] == 'freebsd' or \ plat[:6] == 'netbsd' or \ @@ -204,6 +248,19 @@ elif plat[:3] == 'bsd' or \ # a literal value. BAUDRATE_CONSTANTS = ReturnBaudrate() + TIOCSBRK = 0x2000747B # _IO('t', 123) + TIOCCBRK = 0x2000747A # _IO('t', 122) + + + def _update_break_state(self): + """\ + Set break: Controls TXD. When active, no transmitting is possible. + """ + if self._break_state: + fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK) + else: + fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK) + else: class PlatformSpecific(PlatformSpecificBase): pass @@ -265,43 +322,72 @@ class Serial(SerialBase, PlatformSpecific): self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None - raise SerialException(msg.errno, "could not open port {0}: {1}".format(self._port, msg)) + raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking + self.pipe_abort_read_r, self.pipe_abort_read_w = None, None + self.pipe_abort_write_r, self.pipe_abort_write_w = None, None + try: self._reconfigure_port(force_update=True) - except: + + try: + if not self._dsrdtr: + self._update_dtr_state() + if not self._rtscts: + self._update_rts_state() + except IOError as e: + # ignore Invalid argument and Inappropriate ioctl + if e.errno not in (errno.EINVAL, errno.ENOTTY): + raise + + self._reset_input_buffer() + + self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() + self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() + fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) + fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK) + except BaseException: try: os.close(self.fd) - except: + except Exception: # ignore any exception when closing the port # also to keep original exception that happened when setting up pass self.fd = None + + if self.pipe_abort_read_w is not None: + os.close(self.pipe_abort_read_w) + self.pipe_abort_read_w = None + if self.pipe_abort_read_r is not None: + os.close(self.pipe_abort_read_r) + self.pipe_abort_read_r = None + if self.pipe_abort_write_w is not None: + os.close(self.pipe_abort_write_w) + self.pipe_abort_write_w = None + if self.pipe_abort_write_r is not None: + os.close(self.pipe_abort_write_r) + self.pipe_abort_write_r = None + raise - else: - self.is_open = True - try: - if not self._dsrdtr: - self._update_dtr_state() - if not self._rtscts: - self._update_rts_state() - except IOError as e: - if e.errno in (errno.EINVAL, errno.ENOTTY): - # ignore Invalid argument and Inappropriate ioctl - pass - else: - raise - self.reset_input_buffer() - self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe() - self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe() - fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK) - fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK) + + self.is_open = True def _reconfigure_port(self, force_update=False): """Set communication parameters on opened port.""" if self.fd is None: raise SerialException("Can only operate on a valid file descriptor") + + # if exclusive lock is requested, create it before we modify anything else + if self._exclusive is not None: + if self._exclusive: + try: + fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as msg: + raise SerialException(msg.errno, "Could not exclusively lock port {}: {}".format(self._port, msg)) + else: + fcntl.flock(self.fd, fcntl.LOCK_UN) + custom_baud = None vmin = vtime = 0 # timeout is done via select @@ -331,14 +417,21 @@ class Serial(SerialBase, PlatformSpecific): # setup baud rate try: - ispeed = ospeed = getattr(termios, 'B{0}'.format(self._baudrate)) + ispeed = ospeed = getattr(termios, 'B{}'.format(self._baudrate)) except AttributeError: try: ispeed = ospeed = self.BAUDRATE_CONSTANTS[self._baudrate] except KeyError: #~ raise ValueError('Invalid baud rate: %r' % self._baudrate) - # may need custom baud rate, it isn't in our list. - ispeed = ospeed = getattr(termios, 'B38400') + + # See if BOTHER is defined for this platform; if it is, use + # this for a speed not defined in the baudrate constants list. + try: + ispeed = ospeed = BOTHER + except NameError: + # may need custom baud rate, it isn't in our list. + ispeed = ospeed = getattr(termios, 'B38400') + try: custom_baud = int(self._baudrate) # store for later except ValueError: @@ -464,7 +557,7 @@ class Serial(SerialBase, PlatformSpecific): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() read = bytearray() timeout = Timeout(self._timeout) while len(read) < size: @@ -480,6 +573,19 @@ class Serial(SerialBase, PlatformSpecific): if not ready: break # timeout buf = os.read(self.fd, size - len(read)) + except OSError as e: + # this is for Python 3.x where select.error is a subclass of + # OSError ignore BlockingIOErrors and EINTR. other errors are shown + # https://www.python.org/dev/peps/pep-0475. + if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('read failed: {}'.format(e)) + except select.error as e: + # this is for Python 2.x + # ignore BlockingIOErrors and EINTR. all errors are shown + # see also http://www.python.org/dev/peps/pep-3151/#select + if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('read failed: {}'.format(e)) + else: # read should always return some data as select reported it was # ready to read when we get to this point. if not buf: @@ -490,33 +596,25 @@ class Serial(SerialBase, PlatformSpecific): 'device reports readiness to read but returned no data ' '(device disconnected or multiple access on port?)') read.extend(buf) - except OSError as e: - # this is for Python 3.x where select.error is a subclass of - # OSError ignore EAGAIN errors. all other errors are shown - if e.errno != errno.EAGAIN and e.errno != errno.EINTR: - raise SerialException('read failed: {}'.format(e)) - except select.error as e: - # this is for Python 2.x - # ignore EAGAIN errors. all other errors are shown - # see also http://www.python.org/dev/peps/pep-3151/#select - if e[0] != errno.EAGAIN: - raise SerialException('read failed: {}'.format(e)) + if timeout.expired(): break return bytes(read) def cancel_read(self): - os.write(self.pipe_abort_read_w, b"x") + if self.is_open: + os.write(self.pipe_abort_read_w, b"x") def cancel_write(self): - os.write(self.pipe_abort_write_w, b"x") + if self.is_open: + os.write(self.pipe_abort_write_w, b"x") def write(self, data): """Output the given byte string over the serial port.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() d = to_bytes(data) - tx_len = len(d) + tx_len = length = len(d) timeout = Timeout(self._write_timeout) while tx_len > 0: try: @@ -529,13 +627,13 @@ class Serial(SerialBase, PlatformSpecific): # when timeout is set, use select to wait for being ready # with the time left as timeout if timeout.expired(): - raise writeTimeoutError + raise SerialTimeoutException('Write timeout') abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left()) if abort: os.read(self.pipe_abort_write_r, 1000) break if not ready: - raise writeTimeoutError + raise SerialTimeoutException('Write timeout') else: assert timeout.time_left() is None # wait for write operation @@ -549,13 +647,21 @@ class Serial(SerialBase, PlatformSpecific): tx_len -= n except SerialException: raise - except OSError as v: - if v.errno != errno.EAGAIN: - raise SerialException('write failed: {}'.format(v)) - # still calculate and check timeout - if timeout.expired(): - raise writeTimeoutError - return len(data) + except OSError as e: + # this is for Python 3.x where select.error is a subclass of + # OSError ignore BlockingIOErrors and EINTR. other errors are shown + # https://www.python.org/dev/peps/pep-0475. + if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('write failed: {}'.format(e)) + except select.error as e: + # this is for Python 2.x + # ignore BlockingIOErrors and EINTR. all errors are shown + # see also http://www.python.org/dev/peps/pep-3151/#select + if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('write failed: {}'.format(e)) + if not timeout.is_non_blocking and timeout.expired(): + raise SerialTimeoutException('Write timeout') + return length - len(d) def flush(self): """\ @@ -563,14 +669,18 @@ class Serial(SerialBase, PlatformSpecific): is written. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() termios.tcdrain(self.fd) + def _reset_input_buffer(self): + """Clear input buffer, discarding all that is in the buffer.""" + termios.tcflush(self.fd, termios.TCIFLUSH) + def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError - termios.tcflush(self.fd, termios.TCIFLUSH) + raise PortNotOpenError() + self._reset_input_buffer() def reset_output_buffer(self): """\ @@ -578,7 +688,7 @@ class Serial(SerialBase, PlatformSpecific): that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() termios.tcflush(self.fd, termios.TCOFLUSH) def send_break(self, duration=0.25): @@ -587,18 +697,9 @@ class Serial(SerialBase, PlatformSpecific): duration. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() termios.tcsendbreak(self.fd, int(duration / 0.25)) - def _update_break_state(self): - """\ - Set break: Controls TXD. When active, no transmitting is possible. - """ - if self._break_state: - fcntl.ioctl(self.fd, TIOCSBRK) - else: - fcntl.ioctl(self.fd, TIOCCBRK) - def _update_rts_state(self): """Set terminal status line: Request To Send""" if self._rts_state: @@ -617,7 +718,7 @@ class Serial(SerialBase, PlatformSpecific): def cts(self): """Read terminal status line: Clear To Send""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_CTS != 0 @@ -625,7 +726,7 @@ class Serial(SerialBase, PlatformSpecific): def dsr(self): """Read terminal status line: Data Set Ready""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_DSR != 0 @@ -633,7 +734,7 @@ class Serial(SerialBase, PlatformSpecific): def ri(self): """Read terminal status line: Ring Indicator""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_RI != 0 @@ -641,7 +742,7 @@ class Serial(SerialBase, PlatformSpecific): def cd(self): """Read terminal status line: Carrier Detect""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) return struct.unpack('I', s)[0] & TIOCM_CD != 0 @@ -660,7 +761,7 @@ class Serial(SerialBase, PlatformSpecific): WARNING: this function is not portable to different platforms! """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() return self.fd def set_input_flow_control(self, enable=True): @@ -670,7 +771,7 @@ class Serial(SerialBase, PlatformSpecific): WARNING: this function is not portable to different platforms! """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if enable: termios.tcflow(self.fd, termios.TCION) else: @@ -683,7 +784,7 @@ class Serial(SerialBase, PlatformSpecific): WARNING: this function is not portable to different platforms! """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if enable: termios.tcflow(self.fd, termios.TCOON) else: @@ -709,23 +810,30 @@ class PosixPollSerial(Serial): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() read = bytearray() + timeout = Timeout(self._timeout) poll = select.poll() poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) + poll.register(self.pipe_abort_read_r, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) if size > 0: while len(read) < size: # print "\tread(): size",size, "have", len(read) #debug # wait until device becomes ready to read (or something fails) - for fd, event in poll.poll(self._timeout * 1000): + for fd, event in poll.poll(None if timeout.is_infinite else (timeout.time_left() * 1000)): + if fd == self.pipe_abort_read_r: + break if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): raise SerialException('device reports error (poll)') # we don't care if it is select.POLLIN or timeout, that's # handled below + if fd == self.pipe_abort_read_r: + os.read(self.pipe_abort_read_r, 1000) + break buf = os.read(self.fd, size - len(read)) read.extend(buf) - if ((self._timeout is not None and self._timeout >= 0) or - (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: + if timeout.expired() \ + or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0) and not buf: break # early abort on timeout return bytes(read) @@ -737,6 +845,9 @@ class VTIMESerial(Serial): the error handling is degraded. Overall timeout is disabled when inter-character timeout is used. + + Note that this implementation does NOT support cancel_read(), it will + just ignore that. """ def _reconfigure_port(self, force_update=True): @@ -776,7 +887,7 @@ class VTIMESerial(Serial): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() read = bytearray() while len(read) < size: buf = os.read(self.fd, size - len(read)) diff --git a/pyren/serial/serialutil.py b/pyren/serial/serialutil.py index 636a10c..789219e 100755 --- a/pyren/serial/serialutil.py +++ b/pyren/serial/serialutil.py @@ -3,10 +3,12 @@ # Base class and support functions used by various backends. # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2001-2016 Chris Liechti +# (C) 2001-2020 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import io import time @@ -95,8 +97,10 @@ class SerialTimeoutException(SerialException): """Write timeouts give an exception""" -writeTimeoutError = SerialTimeoutException('Write timeout') -portNotOpenError = SerialException('Attempting to use a port that is not open') +class PortNotOpenError(SerialException): + """Port is not open""" + def __init__(self): + super(PortNotOpenError, self).__init__('Attempting to use a port that is not open') class Timeout(object): @@ -185,6 +189,7 @@ class SerialBase(io.RawIOBase): write_timeout=None, dsrdtr=False, inter_byte_timeout=None, + exclusive=None, **kwargs): """\ Initialize comm port object. If a "port" is given, then the port will be @@ -211,6 +216,7 @@ class SerialBase(io.RawIOBase): self._rts_state = True self._dtr_state = True self._break_state = False + self._exclusive = None # assign values using get/set methods using the properties feature self.port = port @@ -224,6 +230,8 @@ class SerialBase(io.RawIOBase): self.rtscts = rtscts self.dsrdtr = dsrdtr self.inter_byte_timeout = inter_byte_timeout + self.exclusive = exclusive + # watch for backward compatible kwargs if 'writeTimeout' in kwargs: self.write_timeout = kwargs.pop('writeTimeout') @@ -304,6 +312,18 @@ class SerialBase(io.RawIOBase): if self.is_open: self._reconfigure_port() + @property + def exclusive(self): + """Get the current exclusive access setting.""" + return self._exclusive + + @exclusive.setter + def exclusive(self, exclusive): + """Change the exclusive access setting.""" + self._exclusive = exclusive + if self.is_open: + self._reconfigure_port() + @property def parity(self): """Get the current parity setting.""" @@ -541,6 +561,8 @@ class SerialBase(io.RawIOBase): # context manager def __enter__(self): + if self._port is not None and not self.is_open: + self.open() return self def __exit__(self, *args, **kwargs): @@ -554,7 +576,7 @@ class SerialBase(io.RawIOBase): duration. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() self.break_condition = True time.sleep(duration) self.break_condition = False @@ -629,23 +651,26 @@ class SerialBase(io.RawIOBase): """ return self.read(self.in_waiting) - def read_until(self, terminator=LF, size=None): + def read_until(self, expected=LF, size=None): """\ - Read until a termination sequence is found ('\n' by default), the size + Read until an expected sequence is found ('\n' by default), the size is exceeded or until timeout occurs. """ - lenterm = len(terminator) + lenterm = len(expected) line = bytearray() + timeout = Timeout(self._timeout) while True: c = self.read(1) if c: line += c - if line[-lenterm:] == terminator: + if line[-lenterm:] == expected: break if size is not None and len(line) >= size: break else: break + if timeout.expired(): + break return bytes(line) def iread_until(self, *args, **kwargs): diff --git a/pyren/serial/serialwin32.py b/pyren/serial/serialwin32.py index b275ea3..e7da929 100755 --- a/pyren/serial/serialwin32.py +++ b/pyren/serial/serialwin32.py @@ -2,20 +2,22 @@ # # backend for Windows ("win32" incl. 32/64 bit support) # -# (C) 2001-2015 Chris Liechti +# (C) 2001-2020 Chris Liechti # # This file is part of pySerial. https://github.com/pyserial/pyserial # SPDX-License-Identifier: BSD-3-Clause # # Initial patch to use ctypes by Giovanni Bajo +from __future__ import absolute_import + # pylint: disable=invalid-name,too-few-public-methods import ctypes import time from serial import win32 import serial -from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError +from serial.serialutil import SerialBase, SerialException, to_bytes, PortNotOpenError, SerialTimeoutException class Serial(SerialBase): @@ -182,23 +184,23 @@ class Serial(SerialBase): # XXX verify if platform really does not have a setting for those if not self._rs485_mode.rts_level_for_tx: raise ValueError( - 'Unsupported value for RS485Settings.rts_level_for_tx: {!r}'.format( + 'Unsupported value for RS485Settings.rts_level_for_tx: {!r} (only True is allowed)'.format( self._rs485_mode.rts_level_for_tx,)) if self._rs485_mode.rts_level_for_rx: raise ValueError( - 'Unsupported value for RS485Settings.rts_level_for_rx: {!r}'.format( + 'Unsupported value for RS485Settings.rts_level_for_rx: {!r} (only False is allowed)'.format( self._rs485_mode.rts_level_for_rx,)) if self._rs485_mode.delay_before_tx is not None: raise ValueError( - 'Unsupported value for RS485Settings.delay_before_tx: {!r}'.format( + 'Unsupported value for RS485Settings.delay_before_tx: {!r} (only None is allowed)'.format( self._rs485_mode.delay_before_tx,)) if self._rs485_mode.delay_before_rx is not None: raise ValueError( - 'Unsupported value for RS485Settings.delay_before_rx: {!r}'.format( + 'Unsupported value for RS485Settings.delay_before_rx: {!r} (only None is allowed)'.format( self._rs485_mode.delay_before_rx,)) if self._rs485_mode.loopback: raise ValueError( - 'Unsupported value for RS485Settings.loopback: {!r}'.format( + 'Unsupported value for RS485Settings.loopback: {!r} (only False is allowed)'.format( self._rs485_mode.loopback,)) comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE comDCB.fOutxCtsFlow = 0 @@ -254,7 +256,7 @@ class Serial(SerialBase): flags = win32.DWORD() comstat = win32.COMSTAT() if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): - raise SerialException('call to ClearCommError failed') + raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) return comstat.cbInQue def read(self, size=1): @@ -264,7 +266,7 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if size > 0: win32.ResetEvent(self._overlapped_read.hEvent) flags = win32.DWORD() @@ -301,7 +303,7 @@ class Serial(SerialBase): def write(self, data): """Output the given byte string over the serial port.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() #~ if not isinstance(data, (bytes, bytearray)): #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview @@ -311,7 +313,7 @@ class Serial(SerialBase): n = win32.DWORD() success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) - if not success and win32.GetLastError() != win32.ERROR_IO_PENDING: + if not success and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) # Wait for the write to complete. @@ -320,7 +322,7 @@ class Serial(SerialBase): if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED: return n.value # canceled IO is no error if n.value != len(data): - raise writeTimeoutError + raise SerialTimeoutException('Write timeout') return n.value else: errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError() @@ -349,7 +351,7 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) def reset_output_buffer(self): @@ -358,13 +360,13 @@ class Serial(SerialBase): that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) def _update_break_state(self): """Set break: Controls TXD. When active, to transmitting is possible.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self._break_state: win32.SetCommBreak(self._port_handle) else: @@ -386,7 +388,7 @@ class Serial(SerialBase): def _GetCommModemStatus(self): if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() stat = win32.DWORD() win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat)) return stat.value @@ -416,7 +418,7 @@ class Serial(SerialBase): def set_buffer_size(self, rx_size=4096, tx_size=None): """\ Recommend a buffer size to the driver (device driver can ignore this - value). Must be called before the port is opened. + value). Must be called after the port is opened. """ if tx_size is None: tx_size = rx_size @@ -430,7 +432,7 @@ class Serial(SerialBase): WARNING: this function is not portable to different platforms! """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if enable: win32.EscapeCommFunction(self._port_handle, win32.SETXON) else: @@ -442,7 +444,7 @@ class Serial(SerialBase): flags = win32.DWORD() comstat = win32.COMSTAT() if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): - raise SerialException('call to ClearCommError failed') + raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) return comstat.cbOutQue def _cancel_overlapped_io(self, overlapped): @@ -465,3 +467,11 @@ class Serial(SerialBase): def cancel_write(self): """Cancel a blocking write operation, may be called from other thread""" self._cancel_overlapped_io(self._overlapped_write) + + @SerialBase.exclusive.setter + def exclusive(self, exclusive): + """Change the exclusive access setting.""" + if exclusive is not None and not exclusive: + raise ValueError('win32 only supports exclusive access (not: {})'.format(exclusive)) + else: + serial.SerialBase.exclusive.__set__(self, exclusive) diff --git a/pyren/serial/threaded/__init__.py b/pyren/serial/threaded/__init__.py index 74b6924..b8940b6 100755 --- a/pyren/serial/threaded/__init__.py +++ b/pyren/serial/threaded/__init__.py @@ -9,6 +9,8 @@ """\ Support threading with serial ports. """ +from __future__ import absolute_import + import serial import threading @@ -201,7 +203,7 @@ class ReaderThread(threading.Thread): break else: if data: - # make a separated try-except for called used code + # make a separated try-except for called user code try: self.protocol.data_received(data) except Exception as e: @@ -214,7 +216,7 @@ class ReaderThread(threading.Thread): def write(self, data): """Thread safe writing (uses lock)""" with self._lock: - self.serial.write(data) + return self.serial.write(data) def close(self): """Close the serial port and exit reader thread (uses lock)""" diff --git a/pyren/serial/tools/hexlify_codec.py b/pyren/serial/tools/hexlify_codec.py index 1371da2..bd8f6b0 100755 --- a/pyren/serial/tools/hexlify_codec.py +++ b/pyren/serial/tools/hexlify_codec.py @@ -18,6 +18,8 @@ Therefore decoding is binary to text and thus converting binary data to hex dump """ +from __future__ import absolute_import + import codecs import serial diff --git a/pyren/serial/tools/list_ports.py b/pyren/serial/tools/list_ports.py index 2271dd1..0d7e3d4 100755 --- a/pyren/serial/tools/list_ports.py +++ b/pyren/serial/tools/list_ports.py @@ -16,6 +16,8 @@ Additionally a grep function is supplied that can be used to search for ports based on their descriptions or hardware ID. """ +from __future__ import absolute_import + import sys import os import re @@ -34,14 +36,14 @@ else: # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -def grep(regexp): +def grep(regexp, include_links=False): """\ Search for ports using a regular expression. Port name, description and hardware ID are searched. The function returns an iterable that returns the same tuples as comport() would do. """ r = re.compile(regexp, re.I) - for info in comports(): + for info in comports(include_links): port, desc, hwid = info if r.search(port) or r.search(desc) or r.search(hwid): yield info @@ -73,6 +75,11 @@ def main(): type=int, help='only output the N-th entry') + parser.add_argument( + '-s', '--include-links', + action='store_true', + help='include entries that are symlinks to real devices') + args = parser.parse_args() hits = 0 @@ -80,9 +87,9 @@ def main(): if args.regexp: if not args.quiet: sys.stderr.write("Filtered list with regexp: {!r}\n".format(args.regexp)) - iterator = sorted(grep(args.regexp)) + iterator = sorted(grep(args.regexp, include_links=args.include_links)) else: - iterator = sorted(comports()) + iterator = sorted(comports(include_links=args.include_links)) # list them for n, (port, desc, hwid) in enumerate(iterator, 1): if args.n is None or args.n == n: diff --git a/pyren/serial/tools/list_ports_common.py b/pyren/serial/tools/list_ports_common.py index 4c369f8..617f3dc 100755 --- a/pyren/serial/tools/list_ports_common.py +++ b/pyren/serial/tools/list_ports_common.py @@ -7,7 +7,13 @@ # (C) 2015 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause + +from __future__ import absolute_import + import re +import glob +import os +import os.path def numsplit(text): @@ -29,9 +35,9 @@ def numsplit(text): class ListPortInfo(object): """Info collection base class for serial ports""" - def __init__(self, device=None): + def __init__(self, device, skip_link_detection=False): self.device = device - self.name = None + self.name = os.path.basename(device) self.description = 'n/a' self.hwid = 'n/a' # USB specific data @@ -42,11 +48,14 @@ class ListPortInfo(object): self.manufacturer = None self.product = None self.interface = None + # special handling for links + if not skip_link_detection and device is not None and os.path.islink(device): + self.hwid = 'LINK={}'.format(os.path.realpath(device)) def usb_description(self): """return a short string to name the port based on USB info""" if self.interface is not None: - return '{0} - {1}'.format(self.product, self.interface) + return '{} - {}'.format(self.product, self.interface) elif self.product is not None: return self.product else: @@ -54,11 +63,11 @@ class ListPortInfo(object): def usb_info(self): """return a string with USB related information about device""" - return 'USB VID:PID={0:04X}:{1:04X}{2}{3}'.format( + return 'USB VID:PID={:04X}:{:04X}{}{}'.format( self.vid or 0, self.pid or 0, - ' SER={0}'.format(self.serial_number) if self.serial_number is not None else '', - ' LOCATION={0}'.format(self.location) if self.location is not None else '') + ' SER={}'.format(self.serial_number) if self.serial_number is not None else '', + ' LOCATION={}'.format(self.location) if self.location is not None else '') def apply_usb_info(self): """update description and hwid from USB data""" @@ -66,9 +75,16 @@ class ListPortInfo(object): self.hwid = self.usb_info() def __eq__(self, other): - return self.device == other.device + return isinstance(other, ListPortInfo) and self.device == other.device + + def __hash__(self): + return hash(self.device) def __lt__(self, other): + if not isinstance(other, ListPortInfo): + raise TypeError('unorderable types: {}() and {}()'.format( + type(self).__name__, + type(other).__name__)) return numsplit(self.device) < numsplit(other.device) def __str__(self): @@ -83,7 +99,21 @@ class ListPortInfo(object): elif index == 2: return self.hwid else: - raise IndexError('{0} > 2'.format(index)) + raise IndexError('{} > 2'.format(index)) + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +def list_links(devices): + """\ + search all /dev devices and look for symlinks to known ports already + listed in devices. + """ + links = [] + for device in glob.glob('/dev/*'): + if os.path.islink(device) and os.path.realpath(device) in devices: + links.append(device) + return links + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # test diff --git a/pyren/serial/tools/list_ports_linux.py b/pyren/serial/tools/list_ports_linux.py index 224cef5..c8c1cfc 100755 --- a/pyren/serial/tools/list_ports_linux.py +++ b/pyren/serial/tools/list_ports_linux.py @@ -8,6 +8,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import glob import os from serial.tools import list_ports_common @@ -18,30 +20,46 @@ class SysFS(list_ports_common.ListPortInfo): def __init__(self, device): super(SysFS, self).__init__(device) - self.name = os.path.basename(device) + # special handling for links + if device is not None and os.path.islink(device): + device = os.path.realpath(device) + is_link = True + else: + is_link = False self.usb_device_path = None - if os.path.exists('/sys/class/tty/{0}/device'.format(self.name)): - self.device_path = os.path.realpath('/sys/class/tty/{0}/device'.format(self.name)) + if os.path.exists('/sys/class/tty/{}/device'.format(self.name)): + self.device_path = os.path.realpath('/sys/class/tty/{}/device'.format(self.name)) self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem'))) else: self.device_path = None self.subsystem = None # check device type if self.subsystem == 'usb-serial': - self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path)) + self.usb_interface_path = os.path.dirname(self.device_path) elif self.subsystem == 'usb': - self.usb_device_path = os.path.dirname(self.device_path) + self.usb_interface_path = self.device_path else: - self.usb_device_path = None + self.usb_interface_path = None # fill-in info for USB devices - if self.usb_device_path is not None: + if self.usb_interface_path is not None: + self.usb_device_path = os.path.dirname(self.usb_interface_path) + + try: + num_if = int(self.read_line(self.usb_device_path, 'bNumInterfaces')) + except ValueError: + num_if = 1 + self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16) self.pid = int(self.read_line(self.usb_device_path, 'idProduct'), 16) self.serial_number = self.read_line(self.usb_device_path, 'serial') - self.location = os.path.basename(self.usb_device_path) + if num_if > 1: # multi interface devices like FT4232 + self.location = os.path.basename(self.usb_interface_path) + else: + self.location = os.path.basename(self.usb_device_path) + self.manufacturer = self.read_line(self.usb_device_path, 'manufacturer') self.product = self.read_line(self.usb_device_path, 'product') - self.interface = self.read_line(self.device_path, 'interface') + self.interface = self.read_line(self.usb_interface_path, 'interface') if self.subsystem in ('usb', 'usb-serial'): self.apply_usb_info() @@ -53,6 +71,9 @@ class SysFS(list_ports_common.ListPortInfo): self.description = self.name self.hwid = os.path.basename(self.device_path) + if is_link: + self.hwid += ' LINK={}'.format(device) + def read_line(self, *args): """\ Helper function to read a single line from a file. @@ -67,12 +88,16 @@ class SysFS(list_ports_common.ListPortInfo): return None -def comports(): +def comports(include_links=False): devices = glob.glob('/dev/ttyS*') # built-in serial ports devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver + devices.extend(glob.glob('/dev/ttyXRUSB*')) # xr-usb-serial port exar (DELL Edge 3001) devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile devices.extend(glob.glob('/dev/ttyAMA*')) # ARM internal port (raspi) devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices + devices.extend(glob.glob('/dev/ttyAP*')) # Advantech multi-port serial controllers + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [info for info in [SysFS(d) for d in devices] if info.subsystem != "platform"] # hide non-present internal serial ports @@ -80,5 +105,5 @@ def comports(): # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # test if __name__ == '__main__': - for port, desc, hwid in sorted(comports()): - print("{}: {} [{}]".format(port, desc, hwid)) + for info in sorted(comports()): + print("{0}: {0.subsystem}".format(info)) diff --git a/pyren/serial/tools/list_ports_osx.py b/pyren/serial/tools/list_ports_osx.py index 1d57b96..51b4e8c 100755 --- a/pyren/serial/tools/list_ports_osx.py +++ b/pyren/serial/tools/list_ports_osx.py @@ -7,7 +7,7 @@ # and modifications by cliechti, hoihu, hardkrash # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2013-2015 +# (C) 2013-2020 # # SPDX-License-Identifier: BSD-3-Clause @@ -21,37 +21,54 @@ # Also see the 'IORegistryExplorer' for an idea of what we are actually searching +from __future__ import absolute_import + import ctypes -import ctypes.util from serial.tools import list_ports_common -iokit = ctypes.cdll.LoadLibrary(ctypes.util.find_library('IOKit')) -cf = ctypes.cdll.LoadLibrary(ctypes.util.find_library('CoreFoundation')) +iokit = ctypes.cdll.LoadLibrary('/System/Library/Frameworks/IOKit.framework/IOKit') +cf = ctypes.cdll.LoadLibrary('/System/Library/Frameworks/CoreFoundation.framework/CoreFoundation') -kIOMasterPortDefault = ctypes.c_void_p.in_dll(iokit, "kIOMasterPortDefault") +# kIOMasterPortDefault is no longer exported in BigSur but no biggie, using NULL works just the same +kIOMasterPortDefault = 0 # WAS: ctypes.c_void_p.in_dll(iokit, "kIOMasterPortDefault") kCFAllocatorDefault = ctypes.c_void_p.in_dll(cf, "kCFAllocatorDefault") kCFStringEncodingMacRoman = 0 +kCFStringEncodingUTF8 = 0x08000100 + +# defined in `IOKit/usb/USBSpec.h` +kUSBVendorString = 'USB Vendor Name' +kUSBSerialNumberString = 'USB Serial Number' + +# `io_name_t` defined as `typedef char io_name_t[128];` +# in `device/device_types.h` +io_name_size = 128 + +# defined in `mach/kern_return.h` +KERN_SUCCESS = 0 +# kern_return_t defined as `typedef int kern_return_t;` in `mach/i386/kern_return.h` +kern_return_t = ctypes.c_int iokit.IOServiceMatching.restype = ctypes.c_void_p iokit.IOServiceGetMatchingServices.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] -iokit.IOServiceGetMatchingServices.restype = ctypes.c_void_p +iokit.IOServiceGetMatchingServices.restype = kern_return_t iokit.IORegistryEntryGetParentEntry.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] +iokit.IOServiceGetMatchingServices.restype = kern_return_t iokit.IORegistryEntryCreateCFProperty.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint32] iokit.IORegistryEntryCreateCFProperty.restype = ctypes.c_void_p iokit.IORegistryEntryGetPath.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] -iokit.IORegistryEntryGetPath.restype = ctypes.c_void_p +iokit.IORegistryEntryGetPath.restype = kern_return_t iokit.IORegistryEntryGetName.argtypes = [ctypes.c_void_p, ctypes.c_void_p] -iokit.IORegistryEntryGetName.restype = ctypes.c_void_p +iokit.IORegistryEntryGetName.restype = kern_return_t iokit.IOObjectGetClass.argtypes = [ctypes.c_void_p, ctypes.c_void_p] -iokit.IOObjectGetClass.restype = ctypes.c_void_p +iokit.IOObjectGetClass.restype = kern_return_t iokit.IOObjectRelease.argtypes = [ctypes.c_void_p] @@ -62,6 +79,9 @@ cf.CFStringCreateWithCString.restype = ctypes.c_void_p cf.CFStringGetCStringPtr.argtypes = [ctypes.c_void_p, ctypes.c_uint32] cf.CFStringGetCStringPtr.restype = ctypes.c_char_p +cf.CFStringGetCString.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_long, ctypes.c_uint32] +cf.CFStringGetCString.restype = ctypes.c_bool + cf.CFNumberGetValue.argtypes = [ctypes.c_void_p, ctypes.c_uint32, ctypes.c_void_p] cf.CFNumberGetValue.restype = ctypes.c_void_p @@ -86,8 +106,8 @@ def get_string_property(device_type, property): """ key = cf.CFStringCreateWithCString( kCFAllocatorDefault, - property.encode("mac_roman"), - kCFStringEncodingMacRoman) + property.encode("utf-8"), + kCFStringEncodingUTF8) CFContainer = iokit.IORegistryEntryCreateCFProperty( device_type, @@ -99,7 +119,12 @@ def get_string_property(device_type, property): if CFContainer: output = cf.CFStringGetCStringPtr(CFContainer, 0) if output is not None: - output = output.decode('mac_roman') + output = output.decode('utf-8') + else: + buffer = ctypes.create_string_buffer(io_name_size); + success = cf.CFStringGetCString(CFContainer, ctypes.byref(buffer), io_name_size, kCFStringEncodingUTF8) + if success: + output = buffer.value.decode('utf-8') cf.CFRelease(CFContainer) return output @@ -116,8 +141,8 @@ def get_int_property(device_type, property, cf_number_type): """ key = cf.CFStringCreateWithCString( kCFAllocatorDefault, - property.encode("mac_roman"), - kCFStringEncodingMacRoman) + property.encode("utf-8"), + kCFStringEncodingUTF8) CFContainer = iokit.IORegistryEntryCreateCFProperty( device_type, @@ -135,12 +160,19 @@ def get_int_property(device_type, property, cf_number_type): return number.value return None - def IORegistryEntryGetName(device): - pathname = ctypes.create_string_buffer(100) # TODO: Is this ok? - iokit.IOObjectGetClass(device, ctypes.byref(pathname)) - return pathname.value + devicename = ctypes.create_string_buffer(io_name_size); + res = iokit.IORegistryEntryGetName(device, ctypes.byref(devicename)) + if res != KERN_SUCCESS: + return None + # this works in python2 but may not be valid. Also I don't know if + # this encoding is guaranteed. It may be dependent on system locale. + return devicename.value.decode('utf-8') +def IOObjectGetClass(device): + classname = ctypes.create_string_buffer(io_name_size) + iokit.IOObjectGetClass(device, ctypes.byref(classname)) + return classname.value def GetParentDeviceByType(device, parent_type): """ Find the first parent of a device that implements the parent_type @@ -148,15 +180,15 @@ def GetParentDeviceByType(device, parent_type): @return Pointer to the parent type, or None if it was not found. """ # First, try to walk up the IOService tree to find a parent of this device that is a IOUSBDevice. - parent_type = parent_type.encode('mac_roman') - while IORegistryEntryGetName(device) != parent_type: + parent_type = parent_type.encode('utf-8') + while IOObjectGetClass(device) != parent_type: parent = ctypes.c_void_p() response = iokit.IORegistryEntryGetParentEntry( device, - "IOService".encode("mac_roman"), + "IOService".encode("utf-8"), ctypes.byref(parent)) # If we weren't able to find a parent for the device, we're done. - if response != 0: + if response != KERN_SUCCESS: return None device = parent return device @@ -170,7 +202,7 @@ def GetIOServicesByType(service_type): iokit.IOServiceGetMatchingServices( kIOMasterPortDefault, - iokit.IOServiceMatching(service_type.encode('mac_roman')), + iokit.IOServiceMatching(service_type.encode('utf-8')), ctypes.byref(serial_port_iterator)) services = [] @@ -227,7 +259,8 @@ def search_for_locationID_in_interfaces(serial_interfaces, locationID): return None -def comports(): +def comports(include_links=False): + # XXX include_links is currently ignored. are links in /dev even supported here? # Scan for all iokit serial ports services = GetIOServicesByType('IOSerialBSDClient') ports = [] @@ -238,14 +271,21 @@ def comports(): if device: info = list_ports_common.ListPortInfo(device) # If the serial port is implemented by IOUSBDevice - usb_device = GetParentDeviceByType(service, "IOUSBDevice") + # NOTE IOUSBDevice was deprecated as of 10.11 and finally on Apple Silicon + # devices has been completely removed. Thanks to @oskay for this patch. + usb_device = GetParentDeviceByType(service, "IOUSBHostDevice") + if not usb_device: + usb_device = GetParentDeviceByType(service, "IOUSBDevice") if usb_device: # fetch some useful informations from properties info.vid = get_int_property(usb_device, "idVendor", kCFNumberSInt16Type) info.pid = get_int_property(usb_device, "idProduct", kCFNumberSInt16Type) - info.serial_number = get_string_property(usb_device, "USB Serial Number") - info.product = get_string_property(usb_device, "USB Product Name") or 'n/a' - info.manufacturer = get_string_property(usb_device, "USB Vendor Name") + info.serial_number = get_string_property(usb_device, kUSBSerialNumberString) + # We know this is a usb device, so the + # IORegistryEntryName should always be aliased to the + # usb product name string descriptor. + info.product = IORegistryEntryGetName(usb_device) or 'n/a' + info.manufacturer = get_string_property(usb_device, kUSBVendorString) locationID = get_int_property(usb_device, "locationID", kCFNumberSInt32Type) info.location = location_to_string(locationID) info.interface = search_for_locationID_in_interfaces(serial_interfaces, locationID) diff --git a/pyren/serial/tools/list_ports_posix.py b/pyren/serial/tools/list_ports_posix.py index 6ea4db9..79bc8ed 100755 --- a/pyren/serial/tools/list_ports_posix.py +++ b/pyren/serial/tools/list_ports_posix.py @@ -16,6 +16,8 @@ As currently no method is known to get the second two strings easily, they are currently just identical to the port name. """ +from __future__ import absolute_import + import glob import sys import os @@ -34,48 +36,64 @@ elif plat == 'cygwin': # cygwin/win32 # cygwin accepts /dev/com* in many contexts # (such as 'open' call, explicit 'ls'), but 'glob.glob' # and bare 'ls' do not; so use /dev/ttyS* instead - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/ttyS*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:7] == 'openbsd': # OpenBSD - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/cua*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:3] == 'bsd' or plat[:7] == 'freebsd': - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/cua*[!.init][!.lock]') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:6] == 'netbsd': # NetBSD - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/dty*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:4] == 'irix': # IRIX - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/ttyf*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:2] == 'hp': # HP-UX (not tested) - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*p0') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:5] == 'sunos': # Solaris/SunOS - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*c') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:3] == 'aix': # AIX - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] else: diff --git a/pyren/serial/tools/list_ports_windows.py b/pyren/serial/tools/list_ports_windows.py index 93fa128..0b4a5b1 100755 --- a/pyren/serial/tools/list_ports_windows.py +++ b/pyren/serial/tools/list_ports_windows.py @@ -8,6 +8,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + # pylint: disable=invalid-name,too-few-public-methods import re import ctypes @@ -113,14 +115,28 @@ RegCloseKey.argtypes = [HKEY] RegCloseKey.restype = LONG RegQueryValueEx = advapi32.RegQueryValueExW -RegQueryValueEx.argtypes = [HKEY, LPCTSTR , LPDWORD, LPDWORD, LPBYTE, LPDWORD] +RegQueryValueEx.argtypes = [HKEY, LPCTSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD] RegQueryValueEx.restype = LONG +cfgmgr32 = ctypes.windll.LoadLibrary("Cfgmgr32") +CM_Get_Parent = cfgmgr32.CM_Get_Parent +CM_Get_Parent.argtypes = [PDWORD, DWORD, ULONG] +CM_Get_Parent.restype = LONG + +CM_Get_Device_IDW = cfgmgr32.CM_Get_Device_IDW +CM_Get_Device_IDW.argtypes = [DWORD, PTSTR, ULONG, ULONG] +CM_Get_Device_IDW.restype = LONG + +CM_MapCrToWin32Err = cfgmgr32.CM_MapCrToWin32Err +CM_MapCrToWin32Err.argtypes = [DWORD, DWORD] +CM_MapCrToWin32Err.restype = DWORD + DIGCF_PRESENT = 2 DIGCF_DEVICEINTERFACE = 16 INVALID_HANDLE_VALUE = 0 ERROR_INSUFFICIENT_BUFFER = 122 +ERROR_NOT_FOUND = 1168 SPDRP_HARDWAREID = 1 SPDRP_FRIENDLYNAME = 12 SPDRP_LOCATION_PATHS = 35 @@ -130,19 +146,120 @@ DIREG_DEV = 0x00000001 KEY_READ = 0x20019 +MAX_USB_DEVICE_TREE_TRAVERSAL_DEPTH = 5 + + +def get_parent_serial_number(child_devinst, child_vid, child_pid, depth=0, last_serial_number=None): + """ Get the serial number of the parent of a device. + + Args: + child_devinst: The device instance handle to get the parent serial number of. + child_vid: The vendor ID of the child device. + child_pid: The product ID of the child device. + depth: The current iteration depth of the USB device tree. + """ + + # If the traversal depth is beyond the max, abandon attempting to find the serial number. + if depth > MAX_USB_DEVICE_TREE_TRAVERSAL_DEPTH: + return '' if not last_serial_number else last_serial_number + + # Get the parent device instance. + devinst = DWORD() + ret = CM_Get_Parent(ctypes.byref(devinst), child_devinst, 0) + + if ret: + win_error = CM_MapCrToWin32Err(DWORD(ret), DWORD(0)) + + # If there is no parent available, the child was the root device. We cannot traverse + # further. + if win_error == ERROR_NOT_FOUND: + return '' if not last_serial_number else last_serial_number + + raise ctypes.WinError(win_error) + + # Get the ID of the parent device and parse it for vendor ID, product ID, and serial number. + parentHardwareID = ctypes.create_unicode_buffer(250) + + ret = CM_Get_Device_IDW( + devinst, + parentHardwareID, + ctypes.sizeof(parentHardwareID) - 1, + 0) + + if ret: + raise ctypes.WinError(CM_MapCrToWin32Err(DWORD(ret), DWORD(0))) + + parentHardwareID_str = parentHardwareID.value + m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(.*))?', + parentHardwareID_str, + re.I) + + # return early if we have no matches (likely malformed serial, traversed too far) + if not m: + return '' if not last_serial_number else last_serial_number + + vid = None + pid = None + serial_number = None + if m.group(1): + vid = int(m.group(1), 16) + if m.group(3): + pid = int(m.group(3), 16) + if m.group(7): + serial_number = m.group(7) + + # store what we found as a fallback for malformed serial values up the chain + found_serial_number = serial_number + + # Check that the USB serial number only contains alpha-numeric characters. It may be a windows + # device ID (ephemeral ID). + if serial_number and not re.match(r'^\w+$', serial_number): + serial_number = None + + if not vid or not pid: + # If pid and vid are not available at this device level, continue to the parent. + return get_parent_serial_number(devinst, child_vid, child_pid, depth + 1, found_serial_number) + + if pid != child_pid or vid != child_vid: + # If the VID or PID has changed, we are no longer looking at the same physical device. The + # serial number is unknown. + return '' if not last_serial_number else last_serial_number + + # In this case, the vid and pid of the parent device are identical to the child. However, if + # there still isn't a serial number available, continue to the next parent. + if not serial_number: + return get_parent_serial_number(devinst, child_vid, child_pid, depth + 1, found_serial_number) + + # Finally, the VID and PID are identical to the child and a serial number is present, so return + # it. + return serial_number + + def iterate_comports(): """Return a generator that yields descriptions for serial ports""" - GUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough... - guids_size = DWORD() + PortsGUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough... + ports_guids_size = DWORD() if not SetupDiClassGuidsFromName( "Ports", - GUIDs, - ctypes.sizeof(GUIDs), - ctypes.byref(guids_size)): + PortsGUIDs, + ctypes.sizeof(PortsGUIDs), + ctypes.byref(ports_guids_size)): raise ctypes.WinError() + ModemsGUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough... + modems_guids_size = DWORD() + if not SetupDiClassGuidsFromName( + "Modem", + ModemsGUIDs, + ctypes.sizeof(ModemsGUIDs), + ctypes.byref(modems_guids_size)): + raise ctypes.WinError() + + GUIDs = PortsGUIDs[:ports_guids_size.value] + ModemsGUIDs[:modems_guids_size.value] + # repeat for all possible GUIDs - for index in range(guids_size.value): + for index in range(len(GUIDs)): + bInterfaceNumber = None g_hdi = SetupDiGetClassDevs( ctypes.byref(GUIDs[index]), None, @@ -205,18 +322,26 @@ def iterate_comports(): # stringify szHardwareID_str = szHardwareID.value - info = list_ports_common.ListPortInfo(port_name_buffer.value) + info = list_ports_common.ListPortInfo(port_name_buffer.value, skip_link_detection=True) # in case of USB, make a more readable string, similar to that form # that we also generate on other platforms if szHardwareID_str.startswith('USB'): - m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I) + m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(.*))?', szHardwareID_str, re.I) if m: info.vid = int(m.group(1), 16) if m.group(3): info.pid = int(m.group(3), 16) if m.group(5): - info.serial_number = m.group(5) + bInterfaceNumber = int(m.group(5)) + + # Check that the USB serial number only contains alpha-numeric characters. It + # may be a windows device ID (ephemeral ID) for composite devices. + if m.group(7) and re.match(r'^\w+$', m.group(7)): + info.serial_number = m.group(7) + else: + info.serial_number = get_parent_serial_number(devinfo.DevInst, info.vid, info.pid) + # calculate a location string loc_path_str = ctypes.create_unicode_buffer(250) if SetupDiGetDeviceRegistryProperty( @@ -238,6 +363,10 @@ def iterate_comports(): else: location.append('-') location.append(g.group(2)) + if bInterfaceNumber is not None: + location.append(':{}.{}'.format( + 'x', # XXX how to determine correct bConfigurationValue? + bInterfaceNumber)) if location: info.location = ''.join(location) info.hwid = info.usb_info() @@ -287,7 +416,7 @@ def iterate_comports(): SetupDiDestroyDeviceInfoList(g_hdi) -def comports(): +def comports(include_links=False): """Return a list of info objects about serial ports""" return list(iterate_comports()) diff --git a/pyren/serial/tools/miniterm.py b/pyren/serial/tools/miniterm.py index 7c68e9d..2cceff6 100755 --- a/pyren/serial/tools/miniterm.py +++ b/pyren/serial/tools/miniterm.py @@ -3,10 +3,12 @@ # Very simple serial terminal # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C)2002-2015 Chris Liechti +# (C)2002-2020 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import codecs import os import sys @@ -86,6 +88,7 @@ class ConsoleBase(object): if os.name == 'nt': # noqa import msvcrt import ctypes + import platform class Out(object): """file-like wrapper that uses os.write""" @@ -100,12 +103,52 @@ if os.name == 'nt': # noqa os.write(self.fd, s) class Console(ConsoleBase): + fncodes = { + ';': '\1bOP', # F1 + '<': '\1bOQ', # F2 + '=': '\1bOR', # F3 + '>': '\1bOS', # F4 + '?': '\1b[15~', # F5 + '@': '\1b[17~', # F6 + 'A': '\1b[18~', # F7 + 'B': '\1b[19~', # F8 + 'C': '\1b[20~', # F9 + 'D': '\1b[21~', # F10 + } + navcodes = { + 'H': '\x1b[A', # UP + 'P': '\x1b[B', # DOWN + 'K': '\x1b[D', # LEFT + 'M': '\x1b[C', # RIGHT + 'G': '\x1b[H', # HOME + 'O': '\x1b[F', # END + 'R': '\x1b[2~', # INSERT + 'S': '\x1b[3~', # DELETE + 'I': '\x1b[5~', # PGUP + 'Q': '\x1b[6~', # PGDN + } + def __init__(self): super(Console, self).__init__() self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP() self._saved_icp = ctypes.windll.kernel32.GetConsoleCP() ctypes.windll.kernel32.SetConsoleOutputCP(65001) ctypes.windll.kernel32.SetConsoleCP(65001) + # ANSI handling available through SetConsoleMode since Windows 10 v1511 + # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1 + if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586: + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + import ctypes.wintypes as wintypes + if not hasattr(wintypes, 'LPDWORD'): # PY2 + wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) + SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode + GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode + GetStdHandle = ctypes.windll.kernel32.GetStdHandle + mode = wintypes.DWORD() + GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode)) + if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: + SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) + self._saved_cm = mode self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace') # the change of the code page is not propagated to Python, manually fix it sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace') @@ -115,14 +158,25 @@ if os.name == 'nt': # noqa def __del__(self): ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp) ctypes.windll.kernel32.SetConsoleCP(self._saved_icp) + try: + ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm) + except AttributeError: # in case no _saved_cm + pass def getkey(self): while True: z = msvcrt.getwch() if z == unichr(13): return unichr(10) - elif z in (unichr(0), unichr(0x0e)): # functions keys, ignore - msvcrt.getwch() + elif z is unichr(0) or z is unichr(0xe0): + try: + code = msvcrt.getwch() + if z is unichr(0): + return self.fncodes[code] + else: + return self.navcodes[code] + except KeyError: + pass else: return z @@ -135,15 +189,12 @@ if os.name == 'nt': # noqa elif os.name == 'posix': import atexit import termios - import select + import fcntl class Console(ConsoleBase): def __init__(self): super(Console, self).__init__() self.fd = sys.stdin.fileno() - # an additional pipe is used in getkey, so that the cancel method - # can abort the waiting getkey method - self.pipe_r, self.pipe_w = os.pipe() self.old = termios.tcgetattr(self.fd) atexit.register(self.cleanup) if sys.version_info < (3, 0): @@ -159,17 +210,13 @@ elif os.name == 'posix': termios.tcsetattr(self.fd, termios.TCSANOW, new) def getkey(self): - ready, _, _ = select.select([self.enc_stdin, self.pipe_r], [], [], None) - if self.pipe_r in ready: - os.read(self.pipe_r, 1) - return c = self.enc_stdin.read(1) if c == unichr(0x7f): c = unichr(8) # map the BS key (which yields DEL) to backspace return c def cancel(self): - os.write(self.pipe_w, b"x") + fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') def cleanup(self): termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) @@ -282,12 +329,12 @@ class DebugIO(Transform): """Print what is sent and received""" def rx(self, text): - sys.stderr.write(' [RX:{}] '.format(repr(text))) + sys.stderr.write(' [RX:{!r}] '.format(text)) sys.stderr.flush() return text def tx(self, text): - sys.stderr.write(' [TX:{}] '.format(repr(text))) + sys.stderr.write(' [TX:{!r}] '.format(text)) sys.stderr.flush() return text @@ -322,7 +369,7 @@ def ask_for_port(): sys.stderr.write('\n--- Available ports:\n') ports = [] for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): - sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc)) + sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) ports.append(port) while True: port = raw_input('--- Enter port index or full name: ') @@ -354,8 +401,8 @@ class Miniterm(object): self.eol = eol self.filters = filters self.update_transformations() - self.exit_character = 0x1d # GS/CTRL+] - self.menu_character = 0x14 # Menu: CTRL+T + self.exit_character = unichr(0x1d) # GS/CTRL+] + self.menu_character = unichr(0x14) # Menu: CTRL+T self.alive = None self._reader_alive = None self.receiver_thread = None @@ -509,25 +556,7 @@ class Miniterm(object): if self.echo: self.console.write(c) elif c == '\x15': # CTRL+U -> upload file - sys.stderr.write('\n--- File to upload: ') - sys.stderr.flush() - with self.console: - filename = sys.stdin.readline().rstrip('\r\n') - if filename: - try: - with open(filename, 'rb') as f: - sys.stderr.write('--- Sending file {} ---\n'.format(filename)) - while True: - block = f.read(1024) - if not block: - break - self.serial.write(block) - # Wait for output buffer to drain. - self.serial.flush() - sys.stderr.write('.') # Progress indicator. - sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) - except IOError as e: - sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + self.upload_file() elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help sys.stderr.write(self.get_help_text()) elif c == '\x12': # CTRL+R -> Toggle RTS @@ -543,24 +572,9 @@ class Miniterm(object): self.echo = not self.echo sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) elif c == '\x06': # CTRL+F -> edit filters - sys.stderr.write('\n--- Available Filters:\n') - sys.stderr.write('\n'.join( - '--- {:<10} = {.__doc__}'.format(k, v) - for k, v in sorted(TRANSFORMATIONS.items()))) - sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) - with self.console: - new_filters = sys.stdin.readline().lower().split() - if new_filters: - for f in new_filters: - if f not in TRANSFORMATIONS: - sys.stderr.write('--- unknown filter: {}\n'.format(repr(f))) - break - else: - self.filters = new_filters - self.update_transformations() - sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + self.change_filter() elif c == '\x0c': # CTRL+L -> EOL mode - modes = list(EOL_TRANSFORMATIONS) # keys + modes = list(EOL_TRANSFORMATIONS) # keys eol = modes.index(self.eol) + 1 if eol >= len(modes): eol = 0 @@ -568,63 +582,17 @@ class Miniterm(object): sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) self.update_transformations() elif c == '\x01': # CTRL+A -> set encoding - sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) - with self.console: - new_encoding = sys.stdin.readline().strip() - if new_encoding: - try: - codecs.lookup(new_encoding) - except LookupError: - sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) - else: - self.set_rx_encoding(new_encoding) - self.set_tx_encoding(new_encoding) - sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) - sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + self.change_encoding() elif c == '\x09': # CTRL+I -> info self.dump_port_settings() #~ elif c == '\x01': # CTRL+A -> cycle escape mode #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode elif c in 'pP': # P -> change port - with self.console: - try: - port = ask_for_port() - except KeyboardInterrupt: - port = None - if port and port != self.serial.port: - # reader thread needs to be shut down - self._stop_reader() - # save settings - settings = self.serial.getSettingsDict() - try: - new_serial = serial.serial_for_url(port, do_not_open=True) - # restore settings and open - new_serial.applySettingsDict(settings) - new_serial.rts = self.serial.rts - new_serial.dtr = self.serial.dtr - new_serial.open() - new_serial.break_condition = self.serial.break_condition - except Exception as e: - sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) - new_serial.close() - else: - self.serial.close() - self.serial = new_serial - sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) - # and restart the reader thread - self._start_reader() + self.change_port() + elif c in 'zZ': # S -> suspend / open port temporarily + self.suspend_port() elif c in 'bB': # B -> change baudrate - sys.stderr.write('\n--- Baudrate: ') - sys.stderr.flush() - with self.console: - backup = self.serial.baudrate - try: - self.serial.baudrate = int(sys.stdin.readline().strip()) - except ValueError as e: - sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) - self.serial.baudrate = backup - else: - self.dump_port_settings() + self.change_baudrate() elif c == '8': # 8 -> change to 8 bits self.serial.bytesize = serial.EIGHTBITS self.dump_port_settings() @@ -661,16 +629,150 @@ class Miniterm(object): elif c in 'rR': # R -> change hardware flow control self.serial.rtscts = (c == 'R') self.dump_port_settings() + elif c in 'qQ': + self.stop() # Q -> exit app else: sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) + def upload_file(self): + """Ask user for filenname and send its contents""" + sys.stderr.write('\n--- File to upload: ') + sys.stderr.flush() + with self.console: + filename = sys.stdin.readline().rstrip('\r\n') + if filename: + try: + with open(filename, 'rb') as f: + sys.stderr.write('--- Sending file {} ---\n'.format(filename)) + while True: + block = f.read(1024) + if not block: + break + self.serial.write(block) + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) + except IOError as e: + sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + + def change_filter(self): + """change the i/o transformations""" + sys.stderr.write('\n--- Available Filters:\n') + sys.stderr.write('\n'.join( + '--- {:<10} = {.__doc__}'.format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()))) + sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) + with self.console: + new_filters = sys.stdin.readline().lower().split() + if new_filters: + for f in new_filters: + if f not in TRANSFORMATIONS: + sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) + break + else: + self.filters = new_filters + self.update_transformations() + sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + + def change_encoding(self): + """change encoding on the serial port""" + sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) + with self.console: + new_encoding = sys.stdin.readline().strip() + if new_encoding: + try: + codecs.lookup(new_encoding) + except LookupError: + sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) + else: + self.set_rx_encoding(new_encoding) + self.set_tx_encoding(new_encoding) + sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) + sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + + def change_baudrate(self): + """change the baudrate""" + sys.stderr.write('\n--- Baudrate: ') + sys.stderr.flush() + with self.console: + backup = self.serial.baudrate + try: + self.serial.baudrate = int(sys.stdin.readline().strip()) + except ValueError as e: + sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) + self.serial.baudrate = backup + else: + self.dump_port_settings() + + def change_port(self): + """Have a conversation with the user to change the serial port""" + with self.console: + try: + port = ask_for_port() + except KeyboardInterrupt: + port = None + if port and port != self.serial.port: + # reader thread needs to be shut down + self._stop_reader() + # save settings + settings = self.serial.getSettingsDict() + try: + new_serial = serial.serial_for_url(port, do_not_open=True) + # restore settings and open + new_serial.applySettingsDict(settings) + new_serial.rts = self.serial.rts + new_serial.dtr = self.serial.dtr + new_serial.open() + new_serial.break_condition = self.serial.break_condition + except Exception as e: + sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) + new_serial.close() + else: + self.serial.close() + self.serial = new_serial + sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) + # and restart the reader thread + self._start_reader() + + def suspend_port(self): + """\ + open port temporarily, allow reconnect, exit and port change to get + out of the loop + """ + # reader thread needs to be shut down + self._stop_reader() + self.serial.close() + sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) + do_change_port = False + while not self.serial.is_open: + sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( + exit=key_description(self.exit_character))) + k = self.console.getkey() + if k == self.exit_character: + self.stop() # exit app + break + elif k in 'pP': + do_change_port = True + break + try: + self.serial.open() + except Exception as e: + sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) + if do_change_port: + self.change_port() + else: + # and restart the reader thread + self._start_reader() + sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) + def get_help_text(self): """return the help text""" # help text, starts with blank line! return """ --- pySerial ({version}) - miniterm - help --- ---- {exit:8} Exit program +--- {exit:8} Exit program (alias {menu} Q) --- {menu:8} Menu escape key, followed by: --- Menu keys: --- {menu:7} Send the menu character itself to remote @@ -714,123 +816,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr import argparse parser = argparse.ArgumentParser( - description="Miniterm - A simple terminal program for the serial port.") + description='Miniterm - A simple terminal program for the serial port.') parser.add_argument( - "port", + 'port', nargs='?', - help="serial port name ('-' to show port list)", + help='serial port name ("-" to show port list)', default=default_port) parser.add_argument( - "baudrate", + 'baudrate', nargs='?', type=int, - help="set baud rate, default: %(default)s", + help='set baud rate, default: %(default)s', default=default_baudrate) - group = parser.add_argument_group("port settings") + group = parser.add_argument_group('port settings') group.add_argument( - "--parity", + '--parity', choices=['N', 'E', 'O', 'S', 'M'], type=lambda c: c.upper(), - help="set parity, one of {N E O S M}, default: N", + help='set parity, one of {N E O S M}, default: N', default='N') group.add_argument( - "--rtscts", - action="store_true", - help="enable RTS/CTS flow control (default off)", + '--rtscts', + action='store_true', + help='enable RTS/CTS flow control (default off)', default=False) group.add_argument( - "--xonxoff", - action="store_true", - help="enable software flow control (default off)", + '--xonxoff', + action='store_true', + help='enable software flow control (default off)', default=False) group.add_argument( - "--rts", + '--rts', type=int, - help="set initial RTS line state (possible values: 0, 1)", + help='set initial RTS line state (possible values: 0, 1)', default=default_rts) group.add_argument( - "--dtr", + '--dtr', type=int, - help="set initial DTR line state (possible values: 0, 1)", + help='set initial DTR line state (possible values: 0, 1)', default=default_dtr) group.add_argument( - "--ask", - action="store_true", - help="ask again for port when open fails", - default=False) - - group = parser.add_argument_group("data handling") + '--non-exclusive', + dest='exclusive', + action='store_false', + help='disable locking for native ports', + default=True) group.add_argument( - "-e", "--echo", - action="store_true", - help="enable local echo (default off)", + '--ask', + action='store_true', + help='ask again for port when open fails', + default=False) + + group = parser.add_argument_group('data handling') + + group.add_argument( + '-e', '--echo', + action='store_true', + help='enable local echo (default off)', default=False) group.add_argument( - "--encoding", - dest="serial_port_encoding", - metavar="CODEC", - help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s", + '--encoding', + dest='serial_port_encoding', + metavar='CODEC', + help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', default='UTF-8') group.add_argument( - "-f", "--filter", - action="append", - metavar="NAME", - help="add text transformation", + '-f', '--filter', + action='append', + metavar='NAME', + help='add text transformation', default=[]) group.add_argument( - "--eol", + '--eol', choices=['CR', 'LF', 'CRLF'], type=lambda c: c.upper(), - help="end of line mode", + help='end of line mode', default='CRLF') group.add_argument( - "--raw", - action="store_true", - help="Do no apply any encodings/transformations", + '--raw', + action='store_true', + help='Do no apply any encodings/transformations', default=False) - group = parser.add_argument_group("hotkeys") + group = parser.add_argument_group('hotkeys') group.add_argument( - "--exit-char", + '--exit-char', type=int, metavar='NUM', - help="Unicode of special character that is used to exit the application, default: %(default)s", + help='Unicode of special character that is used to exit the application, default: %(default)s', default=0x1d) # GS/CTRL+] group.add_argument( - "--menu-char", + '--menu-char', type=int, metavar='NUM', - help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s", + help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', default=0x14) # Menu: CTRL+T - group = parser.add_argument_group("diagnostics") + group = parser.add_argument_group('diagnostics') group.add_argument( - "-q", "--quiet", - action="store_true", - help="suppress non-error messages", + '-q', '--quiet', + action='store_true', + help='suppress non-error messages', default=False) group.add_argument( - "--develop", - action="store_true", - help="show Python traceback on error", + '--develop', + action='store_true', + help='show Python traceback on error', default=False) args = parser.parse_args() @@ -883,9 +992,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) serial_instance.rts = args.rts + if isinstance(serial_instance, serial.Serial): + serial_instance.exclusive = args.exclusive + serial_instance.open() except serial.SerialException as e: - sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e)) + sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) if args.develop: raise if not args.ask: @@ -921,7 +1033,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr except KeyboardInterrupt: pass if not args.quiet: - sys.stderr.write("\n--- exit ---\n") + sys.stderr.write('\n--- exit ---\n') miniterm.join() miniterm.close() diff --git a/pyren/serial/urlhandler/protocol_alt.py b/pyren/serial/urlhandler/protocol_alt.py index c14a87e..2e666ca 100755 --- a/pyren/serial/urlhandler/protocol_alt.py +++ b/pyren/serial/urlhandler/protocol_alt.py @@ -16,6 +16,8 @@ # use poll based implementation on Posix (Linux): # python -m serial.tools.miniterm alt:///dev/ttyUSB0?class=PosixPollSerial +from __future__ import absolute_import + try: import urlparse except ImportError: diff --git a/pyren/serial/urlhandler/protocol_hwgrep.py b/pyren/serial/urlhandler/protocol_hwgrep.py index 49bbebe..1a288c9 100755 --- a/pyren/serial/urlhandler/protocol_hwgrep.py +++ b/pyren/serial/urlhandler/protocol_hwgrep.py @@ -20,6 +20,8 @@ # n= pick the N'th entry instead of the first one (numbering starts at 1) # skip_busy tries to open port to check if it is busy, fails on posix as ports are not locked! +from __future__ import absolute_import + import serial import serial.tools.list_ports diff --git a/pyren/serial/urlhandler/protocol_loop.py b/pyren/serial/urlhandler/protocol_loop.py index 7bf6cf9..2aeebfc 100755 --- a/pyren/serial/urlhandler/protocol_loop.py +++ b/pyren/serial/urlhandler/protocol_loop.py @@ -6,13 +6,15 @@ # and it was so easy to implement ;-) # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C) 2001-2015 Chris Liechti +# (C) 2001-2020 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause # # URL format: loop://[option[/option...]] # options: # - "debug" print diagnostic messages +from __future__ import absolute_import + import logging import numbers import time @@ -25,7 +27,7 @@ try: except ImportError: import Queue as queue -from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, writeTimeoutError, portNotOpenError +from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError # map log level names to constants. used in from_url() LOGGER_LEVELS = { @@ -125,7 +127,7 @@ class Serial(SerialBase): def in_waiting(self): """Return the number of bytes currently in the input buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: # attention the logged value can differ from return value in # threaded environments... @@ -139,7 +141,7 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self._timeout is not None and self._timeout != 0: timeout = time.time() + self._timeout else: @@ -179,7 +181,7 @@ class Serial(SerialBase): """ self._cancel_write = False if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() data = to_bytes(data) # calculate aprox time that would be used to send the data time_used_to_send = 10.0 * len(data) / self._baudrate @@ -193,7 +195,7 @@ class Serial(SerialBase): time_left -= 0.5 if self._cancel_write: return 0 # XXX - raise writeTimeoutError + raise SerialTimeoutException('Write timeout') for byte in iterbytes(data): self.queue.put(byte, timeout=self._write_timeout) return len(data) @@ -201,7 +203,7 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('reset_input_buffer()') try: @@ -216,7 +218,7 @@ class Serial(SerialBase): discarding all that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('reset_output_buffer()') try: @@ -225,6 +227,17 @@ class Serial(SerialBase): except queue.Empty: pass + @property + def out_waiting(self): + """Return how many bytes the in the outgoing buffer""" + if not self.is_open: + raise PortNotOpenError() + if self.logger: + # attention the logged value can differ from return value in + # threaded environments... + self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize())) + return self.queue.qsize() + def _update_break_state(self): """\ Set break: Controls TXD. When active, to transmitting is @@ -247,7 +260,7 @@ class Serial(SerialBase): def cts(self): """Read terminal status line: Clear To Send""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state)) return self._rts_state @@ -263,7 +276,7 @@ class Serial(SerialBase): def ri(self): """Read terminal status line: Ring Indicator""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for RI') return False @@ -272,7 +285,7 @@ class Serial(SerialBase): def cd(self): """Read terminal status line: Carrier Detect""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for CD') return True diff --git a/pyren/serial/urlhandler/protocol_rfc2217.py b/pyren/serial/urlhandler/protocol_rfc2217.py index 1898803..ebeec3a 100755 --- a/pyren/serial/urlhandler/protocol_rfc2217.py +++ b/pyren/serial/urlhandler/protocol_rfc2217.py @@ -1,10 +1,12 @@ #! python # -# This is a thin wrapper to load the rfc2271 implementation. +# This is a thin wrapper to load the rfc2217 implementation. # # This file is part of pySerial. https://github.com/pyserial/pyserial # (C) 2011 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + from serial.rfc2217 import Serial # noqa diff --git a/pyren/serial/urlhandler/protocol_socket.py b/pyren/serial/urlhandler/protocol_socket.py index a017ee3..2888467 100755 --- a/pyren/serial/urlhandler/protocol_socket.py +++ b/pyren/serial/urlhandler/protocol_socket.py @@ -16,6 +16,8 @@ # options: # - "debug" print diagnostic messages +from __future__ import absolute_import + import errno import logging import select @@ -26,7 +28,8 @@ try: except ImportError: import urllib.parse as urlparse -from serial.serialutil import SerialBase, SerialException, portNotOpenError, to_bytes +from serial.serialutil import SerialBase, SerialException, to_bytes, \ + PortNotOpenError, SerialTimeoutException, Timeout # map log level names to constants. used in from_url() LOGGER_LEVELS = { @@ -61,6 +64,8 @@ class Serial(SerialBase): except Exception as msg: self._socket = None raise SerialException("Could not open port {}: {}".format(self.portstr, msg)) + # after connecting, switch to non-blocking, we're using select + self._socket.setblocking(False) # not that there is anything to configure... self._reconfigure_port() @@ -131,7 +136,7 @@ class Serial(SerialBase): def in_waiting(self): """Return the number of bytes currently in the input buffer.""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() # Poll the socket to see if it is ready for reading. # If ready, at least one byte will be to read. lr, lw, lx = select.select([self._socket], [], [], 0) @@ -147,13 +152,12 @@ class Serial(SerialBase): until the requested number of bytes is read. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() read = bytearray() - timeout = self._timeout + timeout = Timeout(self._timeout) while len(read) < size: try: - start_time = time.time() - ready, _, _ = select.select([self._socket], [], [], timeout) + ready, _, _ = select.select([self._socket], [], [], timeout.time_left()) # If select was used with a timeout, and the timeout occurs, it # returns with empty lists -> thus abort read operation. # For timeout == 0 (non-blocking operation) also abort when @@ -166,27 +170,20 @@ class Serial(SerialBase): if not buf: raise SerialException('socket disconnected') read.extend(buf) - if timeout is not None: - timeout -= time.time() - start_time - if timeout <= 0: - break - except socket.timeout: - # timeout is used for write support, just go reading again - pass - except socket.error as e: - # connection fails -> terminate loop - raise SerialException('connection failed ({})'.format(e)) except OSError as e: # this is for Python 3.x where select.error is a subclass of - # OSError ignore EAGAIN errors. all other errors are shown - if e.errno != errno.EAGAIN: + # OSError ignore BlockingIOErrors and EINTR. other errors are shown + # https://www.python.org/dev/peps/pep-0475. + if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): raise SerialException('read failed: {}'.format(e)) - except select.error as e: + except (select.error, socket.error) as e: # this is for Python 2.x - # ignore EAGAIN errors. all other errors are shown + # ignore BlockingIOErrors and EINTR. all errors are shown # see also http://www.python.org/dev/peps/pep-3151/#select - if e[0] != errno.EAGAIN: + if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): raise SerialException('read failed: {}'.format(e)) + if timeout.expired(): + break return bytes(read) def write(self, data): @@ -196,20 +193,76 @@ class Serial(SerialBase): closed. """ if not self.is_open: - raise portNotOpenError - try: - self._socket.sendall(to_bytes(data)) - except socket.error as e: - # XXX what exception if socket connection fails - raise SerialException("socket connection failed: {}".format(e)) - return len(data) + raise PortNotOpenError() + + d = to_bytes(data) + tx_len = length = len(d) + timeout = Timeout(self._write_timeout) + while tx_len > 0: + try: + n = self._socket.send(d) + if timeout.is_non_blocking: + # Zero timeout indicates non-blocking - simply return the + # number of bytes of data actually written + return n + elif not timeout.is_infinite: + # when timeout is set, use select to wait for being ready + # with the time left as timeout + if timeout.expired(): + raise SerialTimeoutException('Write timeout') + _, ready, _ = select.select([], [self._socket], [], timeout.time_left()) + if not ready: + raise SerialTimeoutException('Write timeout') + else: + assert timeout.time_left() is None + # wait for write operation + _, ready, _ = select.select([], [self._socket], [], None) + if not ready: + raise SerialException('write failed (select)') + d = d[n:] + tx_len -= n + except SerialException: + raise + except OSError as e: + # this is for Python 3.x where select.error is a subclass of + # OSError ignore BlockingIOErrors and EINTR. other errors are shown + # https://www.python.org/dev/peps/pep-0475. + if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('write failed: {}'.format(e)) + except select.error as e: + # this is for Python 2.x + # ignore BlockingIOErrors and EINTR. all errors are shown + # see also http://www.python.org/dev/peps/pep-3151/#select + if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('write failed: {}'.format(e)) + if not timeout.is_non_blocking and timeout.expired(): + raise SerialTimeoutException('Write timeout') + return length - len(d) def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" if not self.is_open: - raise portNotOpenError - if self.logger: - self.logger.info('ignored reset_input_buffer') + raise PortNotOpenError() + + # just use recv to remove input, while there is some + ready = True + while ready: + ready, _, _ = select.select([self._socket], [], [], 0) + try: + if ready: + ready = self._socket.recv(4096) + except OSError as e: + # this is for Python 3.x where select.error is a subclass of + # OSError ignore BlockingIOErrors and EINTR. other errors are shown + # https://www.python.org/dev/peps/pep-0475. + if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('read failed: {}'.format(e)) + except (select.error, socket.error) as e: + # this is for Python 2.x + # ignore BlockingIOErrors and EINTR. all errors are shown + # see also http://www.python.org/dev/peps/pep-3151/#select + if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR): + raise SerialException('read failed: {}'.format(e)) def reset_output_buffer(self): """\ @@ -217,7 +270,7 @@ class Serial(SerialBase): discarding all that is in the buffer. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('ignored reset_output_buffer') @@ -227,7 +280,7 @@ class Serial(SerialBase): duration. """ if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('ignored send_break({!r})'.format(duration)) @@ -251,7 +304,7 @@ class Serial(SerialBase): def cts(self): """Read terminal status line: Clear To Send""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for cts') return True @@ -260,7 +313,7 @@ class Serial(SerialBase): def dsr(self): """Read terminal status line: Data Set Ready""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for dsr') return True @@ -269,7 +322,7 @@ class Serial(SerialBase): def ri(self): """Read terminal status line: Ring Indicator""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for ri') return False @@ -278,7 +331,7 @@ class Serial(SerialBase): def cd(self): """Read terminal status line: Carrier Detect""" if not self.is_open: - raise portNotOpenError + raise PortNotOpenError() if self.logger: self.logger.info('returning dummy for cd)') return True diff --git a/pyren/serial/urlhandler/protocol_spy.py b/pyren/serial/urlhandler/protocol_spy.py index 3479010..67c700b 100755 --- a/pyren/serial/urlhandler/protocol_spy.py +++ b/pyren/serial/urlhandler/protocol_spy.py @@ -20,10 +20,13 @@ # redirect output to an other terminal window on Posix (Linux): # python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color +from __future__ import absolute_import + import sys import time import serial +from serial.serialutil import to_bytes try: import urlparse @@ -198,6 +201,7 @@ class Serial(serial.Serial): return ''.join([parts.netloc, parts.path]) def write(self, tx): + tx = to_bytes(tx) self.formatter.tx(tx) return super(Serial, self).write(tx) diff --git a/pyren/serial/win32.py b/pyren/serial/win32.py index 905ce0f..157f470 100755 --- a/pyren/serial/win32.py +++ b/pyren/serial/win32.py @@ -9,6 +9,8 @@ # pylint: disable=invalid-name,too-few-public-methods,protected-access,too-many-instance-attributes +from __future__ import absolute_import + from ctypes import c_ulong, c_void_p, c_int64, c_char, \ WinDLL, sizeof, Structure, Union, POINTER from ctypes.wintypes import HANDLE @@ -179,6 +181,10 @@ WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject WaitForSingleObject.restype = DWORD WaitForSingleObject.argtypes = [HANDLE, DWORD] +WaitCommEvent = _stdcall_libraries['kernel32'].WaitCommEvent +WaitCommEvent.restype = BOOL +WaitCommEvent.argtypes = [HANDLE, LPDWORD, LPOVERLAPPED] + CancelIoEx = _stdcall_libraries['kernel32'].CancelIoEx CancelIoEx.restype = BOOL CancelIoEx.argtypes = [HANDLE, LPOVERLAPPED] @@ -245,6 +251,12 @@ EV_BREAK = 64 # Variable c_int PURGE_RXCLEAR = 8 # Variable c_int INFINITE = 0xFFFFFFFF +CE_RXOVER = 0x0001 +CE_OVERRUN = 0x0002 +CE_RXPARITY = 0x0004 +CE_FRAME = 0x0008 +CE_BREAK = 0x0010 + class N11_OVERLAPPED4DOLLAR_48E(Union): pass