PySerial 3.5 update

This commit is contained in:
Marianpol 2021-01-08 09:36:28 +01:00
parent 33050e2203
commit 938f332261
24 changed files with 1062 additions and 446 deletions

View File

@ -3,17 +3,19 @@
# This is a wrapper module for different platform implementations
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import sys
import importlib
from serial.serialutil import *
#~ SerialBase, SerialException, to_bytes, iterbytes
__version__ = '3.2.1'
__version__ = '3.5'
VERSION = __version__

View File

@ -58,6 +58,8 @@
# RFC).
# the order of the options is not relevant
from __future__ import absolute_import
import logging
import socket
import struct
@ -74,7 +76,7 @@ except ImportError:
import serial
from serial.serialutil import SerialBase, SerialException, to_bytes, \
iterbytes, portNotOpenError, Timeout
iterbytes, PortNotOpenError, Timeout
# port string is expected to be something like this:
# rfc2217://host:port
@ -380,7 +382,6 @@ class Serial(SerialBase):
9600, 19200, 38400, 57600, 115200)
def __init__(self, *args, **kwargs):
super(Serial, self).__init__(*args, **kwargs)
self._thread = None
self._socket = None
self._linestate = 0
@ -396,6 +397,7 @@ class Serial(SerialBase):
self._rfc2217_port_settings = None
self._rfc2217_options = None
self._read_buffer = None
super(Serial, self).__init__(*args, **kwargs) # must be last call in case of auto-open
def open(self):
"""\
@ -481,7 +483,7 @@ class Serial(SerialBase):
if self.logger:
self.logger.info("Negotiated options: {}".format(self._telnet_options))
# fine, go on, set RFC 2271 specific things
# fine, go on, set RFC 2217 specific things
self._reconfigure_port()
# all things set up get, now a clean start
if not self._dsrdtr:
@ -596,7 +598,7 @@ class Serial(SerialBase):
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self._read_buffer.qsize()
def read(self, size=1):
@ -606,13 +608,19 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
data = bytearray()
try:
timeout = Timeout(self._timeout)
while len(data) < size:
if self._thread is None:
if self._thread is None or not self._thread.is_alive():
raise SerialException('connection failed (reader thread died)')
data += self._read_buffer.get(True, self._timeout)
buf = self._read_buffer.get(True, timeout.time_left())
if buf is None:
return bytes(data)
data += buf
if timeout.expired():
break
except Queue.Empty: # -> timeout
pass
return bytes(data)
@ -624,7 +632,7 @@ class Serial(SerialBase):
closed.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
@ -635,7 +643,7 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self.rfc2217_send_purge(PURGE_RECEIVE_BUFFER)
# empty read buffer
while self._read_buffer.qsize():
@ -647,7 +655,7 @@ class Serial(SerialBase):
discarding all that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self.rfc2217_send_purge(PURGE_TRANSMIT_BUFFER)
def _update_break_state(self):
@ -656,7 +664,7 @@ class Serial(SerialBase):
possible.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('set BREAK to {}'.format('active' if self._break_state else 'inactive'))
if self._break_state:
@ -667,7 +675,7 @@ class Serial(SerialBase):
def _update_rts_state(self):
"""Set terminal status line: Request To Send."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('set RTS to {}'.format('active' if self._rts_state else 'inactive'))
if self._rts_state:
@ -678,7 +686,7 @@ class Serial(SerialBase):
def _update_dtr_state(self):
"""Set terminal status line: Data Terminal Ready."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('set DTR to {}'.format('active' if self._dtr_state else 'inactive'))
if self._dtr_state:
@ -690,28 +698,28 @@ class Serial(SerialBase):
def cts(self):
"""Read terminal status line: Clear To Send."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return bool(self.get_modem_state() & MODEMSTATE_MASK_CTS)
@property
def dsr(self):
"""Read terminal status line: Data Set Ready."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return bool(self.get_modem_state() & MODEMSTATE_MASK_DSR)
@property
def ri(self):
"""Read terminal status line: Ring Indicator."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return bool(self.get_modem_state() & MODEMSTATE_MASK_RI)
@property
def cd(self):
"""Read terminal status line: Carrier Detect."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return bool(self.get_modem_state() & MODEMSTATE_MASK_CD)
# - - - platform specific - - -
@ -735,8 +743,10 @@ class Serial(SerialBase):
# connection fails -> terminate loop
if self.logger:
self.logger.debug("socket error in reader thread: {}".format(e))
self._read_buffer.put(None)
break
if not data:
self._read_buffer.put(None)
break # lost connection
for byte in iterbytes(data):
if mode == M_NORMAL:
@ -780,7 +790,6 @@ class Serial(SerialBase):
self._telnet_negotiate_option(telnet_command, byte)
mode = M_NORMAL
finally:
self._thread = None
if self.logger:
self.logger.debug("read thread terminated")
@ -850,12 +859,12 @@ class Serial(SerialBase):
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self._internal_raw_write(to_bytes([IAC, action, option]))
self._internal_raw_write(IAC + action + option)
def rfc2217_send_subnegotiation(self, option, value=b''):
"""Subnegotiation of RFC2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self._internal_raw_write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
self._internal_raw_write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE)
def rfc2217_send_purge(self, value):
"""\
@ -890,7 +899,7 @@ class Serial(SerialBase):
"""\
get last modem state (cached value. If value is "old", request a new
one. This cache helps that we don't issue to many requests when e.g. all
status lines, one after the other is queried by the user (getCTS, getDSR
status lines, one after the other is queried by the user (CTS, DSR
etc.)
"""
# active modem state polling enabled? is the value fresh enough?
@ -989,12 +998,12 @@ class PortManager(object):
def telnet_send_option(self, action, option):
"""Send DO, DONT, WILL, WONT."""
self.connection.write(to_bytes([IAC, action, option]))
self.connection.write(IAC + action + option)
def rfc2217_send_subnegotiation(self, option, value=b''):
"""Subnegotiation of RFC 2217 parameters."""
value = value.replace(IAC, IAC_DOUBLED)
self.connection.write(to_bytes([IAC, SB, COM_PORT_OPTION, option] + list(value) + [IAC, SE]))
self.connection.write(IAC + SB + COM_PORT_OPTION + option + value + IAC + SE)
# - check modem lines, needs to be called periodically from user to
# establish polling
@ -1005,10 +1014,10 @@ class PortManager(object):
send updates on changes.
"""
modemstate = (
(self.serial.getCTS() and MODEMSTATE_MASK_CTS) |
(self.serial.getDSR() and MODEMSTATE_MASK_DSR) |
(self.serial.getRI() and MODEMSTATE_MASK_RI) |
(self.serial.getCD() and MODEMSTATE_MASK_CD))
(self.serial.cts and MODEMSTATE_MASK_CTS) |
(self.serial.dsr and MODEMSTATE_MASK_DSR) |
(self.serial.ri and MODEMSTATE_MASK_RI) |
(self.serial.cd and MODEMSTATE_MASK_CD))
# check what has changed
deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0
if deltas & MODEMSTATE_MASK_CTS:
@ -1230,12 +1239,12 @@ class PortManager(object):
self.logger.warning("requested break state - not implemented")
pass # XXX needs cached value
elif suboption[2:3] == SET_CONTROL_BREAK_ON:
self.serial.setBreak(True)
self.serial.break_condition = True
if self.logger:
self.logger.info("changed BREAK to active")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_BREAK_ON)
elif suboption[2:3] == SET_CONTROL_BREAK_OFF:
self.serial.setBreak(False)
self.serial.break_condition = False
if self.logger:
self.logger.info("changed BREAK to inactive")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_BREAK_OFF)
@ -1244,12 +1253,12 @@ class PortManager(object):
self.logger.warning("requested DTR state - not implemented")
pass # XXX needs cached value
elif suboption[2:3] == SET_CONTROL_DTR_ON:
self.serial.setDTR(True)
self.serial.dtr = True
if self.logger:
self.logger.info("changed DTR to active")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_DTR_ON)
elif suboption[2:3] == SET_CONTROL_DTR_OFF:
self.serial.setDTR(False)
self.serial.dtr = False
if self.logger:
self.logger.info("changed DTR to inactive")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_DTR_OFF)
@ -1259,12 +1268,12 @@ class PortManager(object):
pass # XXX needs cached value
#~ self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON)
elif suboption[2:3] == SET_CONTROL_RTS_ON:
self.serial.setRTS(True)
self.serial.rts = True
if self.logger:
self.logger.info("changed RTS to active")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON)
elif suboption[2:3] == SET_CONTROL_RTS_OFF:
self.serial.setRTS(False)
self.serial.rts = False
if self.logger:
self.logger.info("changed RTS to inactive")
self.rfc2217_send_subnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_OFF)

View File

@ -13,6 +13,8 @@ serial ports (where supported).
NOTE: Some implementations may only support a subset of the settings.
"""
from __future__ import absolute_import
import time
import serial

View File

@ -7,6 +7,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import System
import System.IO.Ports
from serial.serialutil import *
@ -146,7 +148,7 @@ class Serial(SerialBase):
def in_waiting(self):
"""Return the number of characters currently in the input buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self._port_handle.BytesToRead
def read(self, size=1):
@ -156,7 +158,7 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
# must use single byte reads as this is the only way to read
# without applying encodings
data = bytearray()
@ -172,7 +174,7 @@ class Serial(SerialBase):
def write(self, data):
"""Output the given string over the serial port."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
#~ if not isinstance(data, (bytes, bytearray)):
#~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
try:
@ -180,13 +182,13 @@ class Serial(SerialBase):
# as this is the only one not applying encodings
self._port_handle.Write(as_byte_array(data), 0, len(data))
except System.TimeoutException:
raise writeTimeoutError
raise SerialTimeoutException('Write timeout')
return len(data)
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self._port_handle.DiscardInBuffer()
def reset_output_buffer(self):
@ -195,7 +197,7 @@ class Serial(SerialBase):
discarding all that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self._port_handle.DiscardOutBuffer()
def _update_break_state(self):
@ -203,40 +205,40 @@ class Serial(SerialBase):
Set break: Controls TXD. When active, to transmitting is possible.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self._port_handle.BreakState = bool(self._break_state)
def _update_rts_state(self):
"""Set terminal status line: Request To Send"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self._port_handle.RtsEnable = bool(self._rts_state)
def _update_dtr_state(self):
"""Set terminal status line: Data Terminal Ready"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self._port_handle.DtrEnable = bool(self._dtr_state)
@property
def cts(self):
"""Read terminal status line: Clear To Send"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self._port_handle.CtsHolding
@property
def dsr(self):
"""Read terminal status line: Data Set Ready"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self._port_handle.DsrHolding
@property
def ri(self):
"""Read terminal status line: Ring Indicator"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
#~ return self._port_handle.XXX
return False # XXX an error would be better
@ -244,7 +246,7 @@ class Serial(SerialBase):
def cd(self):
"""Read terminal status line: Carrier Detect"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self._port_handle.CDHolding
# - - platform specific - - - -

View File

@ -7,6 +7,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
from serial.serialutil import *
@ -150,7 +152,7 @@ class Serial(SerialBase):
def in_waiting(self):
"""Return the number of characters currently in the input buffer."""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
return self._instream.available()
def read(self, size=1):
@ -160,7 +162,7 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
read = bytearray()
if size > 0:
while len(read) < size:
@ -175,7 +177,7 @@ class Serial(SerialBase):
def write(self, data):
"""Output the given string over the serial port."""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
if not isinstance(data, (bytes, bytearray)):
raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
self._outstream.write(data)
@ -184,7 +186,7 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self._instream.skip(self._instream.available())
def reset_output_buffer(self):
@ -193,57 +195,57 @@ class Serial(SerialBase):
discarding all that is in the buffer.
"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self._outstream.flush()
def send_break(self, duration=0.25):
"""Send break condition. Timed, returns to idle state after given duration."""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.sendBreak(duration*1000.0)
def _update_break_state(self):
"""Set break: Controls TXD. When active, to transmitting is possible."""
if self.fd is None:
raise portNotOpenError
raise PortNotOpenError()
raise SerialException("The _update_break_state function is not implemented in java.")
def _update_rts_state(self):
"""Set terminal status line: Request To Send"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.setRTS(self._rts_state)
def _update_dtr_state(self):
"""Set terminal status line: Data Terminal Ready"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.setDTR(self._dtr_state)
@property
def cts(self):
"""Read terminal status line: Clear To Send"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.isCTS()
@property
def dsr(self):
"""Read terminal status line: Data Set Ready"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.isDSR()
@property
def ri(self):
"""Read terminal status line: Ring Indicator"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.isRI()
@property
def cd(self):
"""Read terminal status line: Carrier Detect"""
if not self.sPort:
raise portNotOpenError
raise PortNotOpenError()
self.sPort.isCD()

View File

@ -3,7 +3,7 @@
# backend for serial IO for POSIX compatible systems, like Linux, OSX
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
#
@ -26,6 +26,8 @@
# - aix (AIX) /dev/tty%d
from __future__ import absolute_import
# pylint: disable=abstract-method
import errno
import fcntl
@ -37,7 +39,7 @@ import termios
import serial
from serial.serialutil import SerialBase, SerialException, to_bytes, \
portNotOpenError, writeTimeoutError, Timeout
PortNotOpenError, SerialTimeoutException, Timeout
class PlatformSpecificBase(object):
@ -49,6 +51,18 @@ class PlatformSpecificBase(object):
def _set_rs485_mode(self, rs485_settings):
raise NotImplementedError('RS485 not supported on this platform')
def set_low_latency_mode(self, low_latency_settings):
raise NotImplementedError('Low latency not supported on this platform')
def _update_break_state(self):
"""\
Set break: Controls TXD. When active, no transmitting is possible.
"""
if self._break_state:
fcntl.ioctl(self.fd, TIOCSBRK)
else:
fcntl.ioctl(self.fd, TIOCCBRK)
# some systems support an extra flag to enable the two in POSIX unsupported
# paritiy settings for MARK and SPACE
@ -113,6 +127,24 @@ if plat[:5] == 'linux': # Linux (confirmed) # noqa
4000000: 0o010017
}
def set_low_latency_mode(self, low_latency_settings):
buf = array.array('i', [0] * 32)
try:
# get serial_struct
fcntl.ioctl(self.fd, termios.TIOCGSERIAL, buf)
# set or unset ASYNC_LOW_LATENCY flag
if low_latency_settings:
buf[4] |= 0x2000
else:
buf[4] &= ~0x2000
# set serial_struct
fcntl.ioctl(self.fd, termios.TIOCSSERIAL, buf)
except IOError as e:
raise ValueError('Failed to update ASYNC_LOW_LATENCY flag to {}: {}'.format(low_latency_settings, e))
def _set_special_baudrate(self, baudrate):
# right size is 44 on x86_64, allow for some growth
buf = array.array('i', [0] * 64)
@ -182,6 +214,9 @@ elif plat[:6] == 'darwin': # OS X
class PlatformSpecific(PlatformSpecificBase):
osx_version = os.uname()[2].split('.')
TIOCSBRK = 0x2000747B # _IO('t', 123)
TIOCCBRK = 0x2000747A # _IO('t', 122)
# Tiger or above can support arbitrary serial speeds
if int(osx_version[0]) >= 8:
def _set_special_baudrate(self, baudrate):
@ -189,6 +224,15 @@ elif plat[:6] == 'darwin': # OS X
buf = array.array('i', [baudrate])
fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1)
def _update_break_state(self):
"""\
Set break: Controls TXD. When active, no transmitting is possible.
"""
if self._break_state:
fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
else:
fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
elif plat[:3] == 'bsd' or \
plat[:7] == 'freebsd' or \
plat[:6] == 'netbsd' or \
@ -204,6 +248,19 @@ elif plat[:3] == 'bsd' or \
# a literal value.
BAUDRATE_CONSTANTS = ReturnBaudrate()
TIOCSBRK = 0x2000747B # _IO('t', 123)
TIOCCBRK = 0x2000747A # _IO('t', 122)
def _update_break_state(self):
"""\
Set break: Controls TXD. When active, no transmitting is possible.
"""
if self._break_state:
fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
else:
fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
else:
class PlatformSpecific(PlatformSpecificBase):
pass
@ -265,43 +322,72 @@ class Serial(SerialBase, PlatformSpecific):
self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port {0}: {1}".format(self._port, msg))
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
#~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
self.pipe_abort_read_r, self.pipe_abort_read_w = None, None
self.pipe_abort_write_r, self.pipe_abort_write_w = None, None
try:
self._reconfigure_port(force_update=True)
except:
try:
os.close(self.fd)
except:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
raise
else:
self.is_open = True
try:
if not self._dsrdtr:
self._update_dtr_state()
if not self._rtscts:
self._update_rts_state()
except IOError as e:
if e.errno in (errno.EINVAL, errno.ENOTTY):
# ignore Invalid argument and Inappropriate ioctl
pass
else:
if e.errno not in (errno.EINVAL, errno.ENOTTY):
raise
self.reset_input_buffer()
self._reset_input_buffer()
self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
except BaseException:
try:
os.close(self.fd)
except Exception:
# ignore any exception when closing the port
# also to keep original exception that happened when setting up
pass
self.fd = None
if self.pipe_abort_read_w is not None:
os.close(self.pipe_abort_read_w)
self.pipe_abort_read_w = None
if self.pipe_abort_read_r is not None:
os.close(self.pipe_abort_read_r)
self.pipe_abort_read_r = None
if self.pipe_abort_write_w is not None:
os.close(self.pipe_abort_write_w)
self.pipe_abort_write_w = None
if self.pipe_abort_write_r is not None:
os.close(self.pipe_abort_write_r)
self.pipe_abort_write_r = None
raise
self.is_open = True
def _reconfigure_port(self, force_update=False):
"""Set communication parameters on opened port."""
if self.fd is None:
raise SerialException("Can only operate on a valid file descriptor")
# if exclusive lock is requested, create it before we modify anything else
if self._exclusive is not None:
if self._exclusive:
try:
fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as msg:
raise SerialException(msg.errno, "Could not exclusively lock port {}: {}".format(self._port, msg))
else:
fcntl.flock(self.fd, fcntl.LOCK_UN)
custom_baud = None
vmin = vtime = 0 # timeout is done via select
@ -331,14 +417,21 @@ class Serial(SerialBase, PlatformSpecific):
# setup baud rate
try:
ispeed = ospeed = getattr(termios, 'B{0}'.format(self._baudrate))
ispeed = ospeed = getattr(termios, 'B{}'.format(self._baudrate))
except AttributeError:
try:
ispeed = ospeed = self.BAUDRATE_CONSTANTS[self._baudrate]
except KeyError:
#~ raise ValueError('Invalid baud rate: %r' % self._baudrate)
# See if BOTHER is defined for this platform; if it is, use
# this for a speed not defined in the baudrate constants list.
try:
ispeed = ospeed = BOTHER
except NameError:
# may need custom baud rate, it isn't in our list.
ispeed = ospeed = getattr(termios, 'B38400')
try:
custom_baud = int(self._baudrate) # store for later
except ValueError:
@ -464,7 +557,7 @@ class Serial(SerialBase, PlatformSpecific):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
read = bytearray()
timeout = Timeout(self._timeout)
while len(read) < size:
@ -480,6 +573,19 @@ class Serial(SerialBase, PlatformSpecific):
if not ready:
break # timeout
buf = os.read(self.fd, size - len(read))
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
except select.error as e:
# this is for Python 2.x
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
else:
# read should always return some data as select reported it was
# ready to read when we get to this point.
if not buf:
@ -490,33 +596,25 @@ class Serial(SerialBase, PlatformSpecific):
'device reports readiness to read but returned no data '
'(device disconnected or multiple access on port?)')
read.extend(buf)
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore EAGAIN errors. all other errors are shown
if e.errno != errno.EAGAIN and e.errno != errno.EINTR:
raise SerialException('read failed: {}'.format(e))
except select.error as e:
# this is for Python 2.x
# ignore EAGAIN errors. all other errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] != errno.EAGAIN:
raise SerialException('read failed: {}'.format(e))
if timeout.expired():
break
return bytes(read)
def cancel_read(self):
if self.is_open:
os.write(self.pipe_abort_read_w, b"x")
def cancel_write(self):
if self.is_open:
os.write(self.pipe_abort_write_w, b"x")
def write(self, data):
"""Output the given byte string over the serial port."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
d = to_bytes(data)
tx_len = len(d)
tx_len = length = len(d)
timeout = Timeout(self._write_timeout)
while tx_len > 0:
try:
@ -529,13 +627,13 @@ class Serial(SerialBase, PlatformSpecific):
# when timeout is set, use select to wait for being ready
# with the time left as timeout
if timeout.expired():
raise writeTimeoutError
raise SerialTimeoutException('Write timeout')
abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left())
if abort:
os.read(self.pipe_abort_write_r, 1000)
break
if not ready:
raise writeTimeoutError
raise SerialTimeoutException('Write timeout')
else:
assert timeout.time_left() is None
# wait for write operation
@ -549,13 +647,21 @@ class Serial(SerialBase, PlatformSpecific):
tx_len -= n
except SerialException:
raise
except OSError as v:
if v.errno != errno.EAGAIN:
raise SerialException('write failed: {}'.format(v))
# still calculate and check timeout
if timeout.expired():
raise writeTimeoutError
return len(data)
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('write failed: {}'.format(e))
except select.error as e:
# this is for Python 2.x
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('write failed: {}'.format(e))
if not timeout.is_non_blocking and timeout.expired():
raise SerialTimeoutException('Write timeout')
return length - len(d)
def flush(self):
"""\
@ -563,14 +669,18 @@ class Serial(SerialBase, PlatformSpecific):
is written.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
termios.tcdrain(self.fd)
def _reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
termios.tcflush(self.fd, termios.TCIFLUSH)
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
termios.tcflush(self.fd, termios.TCIFLUSH)
raise PortNotOpenError()
self._reset_input_buffer()
def reset_output_buffer(self):
"""\
@ -578,7 +688,7 @@ class Serial(SerialBase, PlatformSpecific):
that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
termios.tcflush(self.fd, termios.TCOFLUSH)
def send_break(self, duration=0.25):
@ -587,18 +697,9 @@ class Serial(SerialBase, PlatformSpecific):
duration.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
termios.tcsendbreak(self.fd, int(duration / 0.25))
def _update_break_state(self):
"""\
Set break: Controls TXD. When active, no transmitting is possible.
"""
if self._break_state:
fcntl.ioctl(self.fd, TIOCSBRK)
else:
fcntl.ioctl(self.fd, TIOCCBRK)
def _update_rts_state(self):
"""Set terminal status line: Request To Send"""
if self._rts_state:
@ -617,7 +718,7 @@ class Serial(SerialBase, PlatformSpecific):
def cts(self):
"""Read terminal status line: Clear To Send"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I', s)[0] & TIOCM_CTS != 0
@ -625,7 +726,7 @@ class Serial(SerialBase, PlatformSpecific):
def dsr(self):
"""Read terminal status line: Data Set Ready"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I', s)[0] & TIOCM_DSR != 0
@ -633,7 +734,7 @@ class Serial(SerialBase, PlatformSpecific):
def ri(self):
"""Read terminal status line: Ring Indicator"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I', s)[0] & TIOCM_RI != 0
@ -641,7 +742,7 @@ class Serial(SerialBase, PlatformSpecific):
def cd(self):
"""Read terminal status line: Carrier Detect"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
return struct.unpack('I', s)[0] & TIOCM_CD != 0
@ -660,7 +761,7 @@ class Serial(SerialBase, PlatformSpecific):
WARNING: this function is not portable to different platforms!
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
return self.fd
def set_input_flow_control(self, enable=True):
@ -670,7 +771,7 @@ class Serial(SerialBase, PlatformSpecific):
WARNING: this function is not portable to different platforms!
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if enable:
termios.tcflow(self.fd, termios.TCION)
else:
@ -683,7 +784,7 @@ class Serial(SerialBase, PlatformSpecific):
WARNING: this function is not portable to different platforms!
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if enable:
termios.tcflow(self.fd, termios.TCOON)
else:
@ -709,23 +810,30 @@ class PosixPollSerial(Serial):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
read = bytearray()
timeout = Timeout(self._timeout)
poll = select.poll()
poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
poll.register(self.pipe_abort_read_r, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
if size > 0:
while len(read) < size:
# print "\tread(): size",size, "have", len(read) #debug
# wait until device becomes ready to read (or something fails)
for fd, event in poll.poll(self._timeout * 1000):
for fd, event in poll.poll(None if timeout.is_infinite else (timeout.time_left() * 1000)):
if fd == self.pipe_abort_read_r:
break
if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
raise SerialException('device reports error (poll)')
# we don't care if it is select.POLLIN or timeout, that's
# handled below
if fd == self.pipe_abort_read_r:
os.read(self.pipe_abort_read_r, 1000)
break
buf = os.read(self.fd, size - len(read))
read.extend(buf)
if ((self._timeout is not None and self._timeout >= 0) or
(self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf:
if timeout.expired() \
or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0) and not buf:
break # early abort on timeout
return bytes(read)
@ -737,6 +845,9 @@ class VTIMESerial(Serial):
the error handling is degraded.
Overall timeout is disabled when inter-character timeout is used.
Note that this implementation does NOT support cancel_read(), it will
just ignore that.
"""
def _reconfigure_port(self, force_update=True):
@ -776,7 +887,7 @@ class VTIMESerial(Serial):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
read = bytearray()
while len(read) < size:
buf = os.read(self.fd, size - len(read))

View File

@ -3,10 +3,12 @@
# Base class and support functions used by various backends.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2016 Chris Liechti <cliechti@gmx.net>
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import io
import time
@ -95,8 +97,10 @@ class SerialTimeoutException(SerialException):
"""Write timeouts give an exception"""
writeTimeoutError = SerialTimeoutException('Write timeout')
portNotOpenError = SerialException('Attempting to use a port that is not open')
class PortNotOpenError(SerialException):
"""Port is not open"""
def __init__(self):
super(PortNotOpenError, self).__init__('Attempting to use a port that is not open')
class Timeout(object):
@ -185,6 +189,7 @@ class SerialBase(io.RawIOBase):
write_timeout=None,
dsrdtr=False,
inter_byte_timeout=None,
exclusive=None,
**kwargs):
"""\
Initialize comm port object. If a "port" is given, then the port will be
@ -211,6 +216,7 @@ class SerialBase(io.RawIOBase):
self._rts_state = True
self._dtr_state = True
self._break_state = False
self._exclusive = None
# assign values using get/set methods using the properties feature
self.port = port
@ -224,6 +230,8 @@ class SerialBase(io.RawIOBase):
self.rtscts = rtscts
self.dsrdtr = dsrdtr
self.inter_byte_timeout = inter_byte_timeout
self.exclusive = exclusive
# watch for backward compatible kwargs
if 'writeTimeout' in kwargs:
self.write_timeout = kwargs.pop('writeTimeout')
@ -304,6 +312,18 @@ class SerialBase(io.RawIOBase):
if self.is_open:
self._reconfigure_port()
@property
def exclusive(self):
"""Get the current exclusive access setting."""
return self._exclusive
@exclusive.setter
def exclusive(self, exclusive):
"""Change the exclusive access setting."""
self._exclusive = exclusive
if self.is_open:
self._reconfigure_port()
@property
def parity(self):
"""Get the current parity setting."""
@ -541,6 +561,8 @@ class SerialBase(io.RawIOBase):
# context manager
def __enter__(self):
if self._port is not None and not self.is_open:
self.open()
return self
def __exit__(self, *args, **kwargs):
@ -554,7 +576,7 @@ class SerialBase(io.RawIOBase):
duration.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
self.break_condition = True
time.sleep(duration)
self.break_condition = False
@ -629,23 +651,26 @@ class SerialBase(io.RawIOBase):
"""
return self.read(self.in_waiting)
def read_until(self, terminator=LF, size=None):
def read_until(self, expected=LF, size=None):
"""\
Read until a termination sequence is found ('\n' by default), the size
Read until an expected sequence is found ('\n' by default), the size
is exceeded or until timeout occurs.
"""
lenterm = len(terminator)
lenterm = len(expected)
line = bytearray()
timeout = Timeout(self._timeout)
while True:
c = self.read(1)
if c:
line += c
if line[-lenterm:] == terminator:
if line[-lenterm:] == expected:
break
if size is not None and len(line) >= size:
break
else:
break
if timeout.expired():
break
return bytes(line)
def iread_until(self, *args, **kwargs):

View File

@ -2,20 +2,22 @@
#
# backend for Windows ("win32" incl. 32/64 bit support)
#
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# SPDX-License-Identifier: BSD-3-Clause
#
# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
from __future__ import absolute_import
# pylint: disable=invalid-name,too-few-public-methods
import ctypes
import time
from serial import win32
import serial
from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError
from serial.serialutil import SerialBase, SerialException, to_bytes, PortNotOpenError, SerialTimeoutException
class Serial(SerialBase):
@ -182,23 +184,23 @@ class Serial(SerialBase):
# XXX verify if platform really does not have a setting for those
if not self._rs485_mode.rts_level_for_tx:
raise ValueError(
'Unsupported value for RS485Settings.rts_level_for_tx: {!r}'.format(
'Unsupported value for RS485Settings.rts_level_for_tx: {!r} (only True is allowed)'.format(
self._rs485_mode.rts_level_for_tx,))
if self._rs485_mode.rts_level_for_rx:
raise ValueError(
'Unsupported value for RS485Settings.rts_level_for_rx: {!r}'.format(
'Unsupported value for RS485Settings.rts_level_for_rx: {!r} (only False is allowed)'.format(
self._rs485_mode.rts_level_for_rx,))
if self._rs485_mode.delay_before_tx is not None:
raise ValueError(
'Unsupported value for RS485Settings.delay_before_tx: {!r}'.format(
'Unsupported value for RS485Settings.delay_before_tx: {!r} (only None is allowed)'.format(
self._rs485_mode.delay_before_tx,))
if self._rs485_mode.delay_before_rx is not None:
raise ValueError(
'Unsupported value for RS485Settings.delay_before_rx: {!r}'.format(
'Unsupported value for RS485Settings.delay_before_rx: {!r} (only None is allowed)'.format(
self._rs485_mode.delay_before_rx,))
if self._rs485_mode.loopback:
raise ValueError(
'Unsupported value for RS485Settings.loopback: {!r}'.format(
'Unsupported value for RS485Settings.loopback: {!r} (only False is allowed)'.format(
self._rs485_mode.loopback,))
comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
comDCB.fOutxCtsFlow = 0
@ -254,7 +256,7 @@ class Serial(SerialBase):
flags = win32.DWORD()
comstat = win32.COMSTAT()
if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
raise SerialException('call to ClearCommError failed')
raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
return comstat.cbInQue
def read(self, size=1):
@ -264,7 +266,7 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if size > 0:
win32.ResetEvent(self._overlapped_read.hEvent)
flags = win32.DWORD()
@ -301,7 +303,7 @@ class Serial(SerialBase):
def write(self, data):
"""Output the given byte string over the serial port."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
#~ if not isinstance(data, (bytes, bytearray)):
#~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
# convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
@ -311,7 +313,7 @@ class Serial(SerialBase):
n = win32.DWORD()
success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0)
if not success and win32.GetLastError() != win32.ERROR_IO_PENDING:
if not success and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
# Wait for the write to complete.
@ -320,7 +322,7 @@ class Serial(SerialBase):
if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED:
return n.value # canceled IO is no error
if n.value != len(data):
raise writeTimeoutError
raise SerialTimeoutException('Write timeout')
return n.value
else:
errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError()
@ -349,7 +351,7 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
def reset_output_buffer(self):
@ -358,13 +360,13 @@ class Serial(SerialBase):
that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
def _update_break_state(self):
"""Set break: Controls TXD. When active, to transmitting is possible."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self._break_state:
win32.SetCommBreak(self._port_handle)
else:
@ -386,7 +388,7 @@ class Serial(SerialBase):
def _GetCommModemStatus(self):
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
stat = win32.DWORD()
win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat))
return stat.value
@ -416,7 +418,7 @@ class Serial(SerialBase):
def set_buffer_size(self, rx_size=4096, tx_size=None):
"""\
Recommend a buffer size to the driver (device driver can ignore this
value). Must be called before the port is opened.
value). Must be called after the port is opened.
"""
if tx_size is None:
tx_size = rx_size
@ -430,7 +432,7 @@ class Serial(SerialBase):
WARNING: this function is not portable to different platforms!
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if enable:
win32.EscapeCommFunction(self._port_handle, win32.SETXON)
else:
@ -442,7 +444,7 @@ class Serial(SerialBase):
flags = win32.DWORD()
comstat = win32.COMSTAT()
if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
raise SerialException('call to ClearCommError failed')
raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
return comstat.cbOutQue
def _cancel_overlapped_io(self, overlapped):
@ -465,3 +467,11 @@ class Serial(SerialBase):
def cancel_write(self):
"""Cancel a blocking write operation, may be called from other thread"""
self._cancel_overlapped_io(self._overlapped_write)
@SerialBase.exclusive.setter
def exclusive(self, exclusive):
"""Change the exclusive access setting."""
if exclusive is not None and not exclusive:
raise ValueError('win32 only supports exclusive access (not: {})'.format(exclusive))
else:
serial.SerialBase.exclusive.__set__(self, exclusive)

View File

@ -9,6 +9,8 @@
"""\
Support threading with serial ports.
"""
from __future__ import absolute_import
import serial
import threading
@ -201,7 +203,7 @@ class ReaderThread(threading.Thread):
break
else:
if data:
# make a separated try-except for called used code
# make a separated try-except for called user code
try:
self.protocol.data_received(data)
except Exception as e:
@ -214,7 +216,7 @@ class ReaderThread(threading.Thread):
def write(self, data):
"""Thread safe writing (uses lock)"""
with self._lock:
self.serial.write(data)
return self.serial.write(data)
def close(self):
"""Close the serial port and exit reader thread (uses lock)"""

View File

@ -18,6 +18,8 @@ Therefore decoding is binary to text and thus converting binary data to hex dump
"""
from __future__ import absolute_import
import codecs
import serial

View File

@ -16,6 +16,8 @@ Additionally a grep function is supplied that can be used to search for ports
based on their descriptions or hardware ID.
"""
from __future__ import absolute_import
import sys
import os
import re
@ -34,14 +36,14 @@ else:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def grep(regexp):
def grep(regexp, include_links=False):
"""\
Search for ports using a regular expression. Port name, description and
hardware ID are searched. The function returns an iterable that returns the
same tuples as comport() would do.
"""
r = re.compile(regexp, re.I)
for info in comports():
for info in comports(include_links):
port, desc, hwid = info
if r.search(port) or r.search(desc) or r.search(hwid):
yield info
@ -73,6 +75,11 @@ def main():
type=int,
help='only output the N-th entry')
parser.add_argument(
'-s', '--include-links',
action='store_true',
help='include entries that are symlinks to real devices')
args = parser.parse_args()
hits = 0
@ -80,9 +87,9 @@ def main():
if args.regexp:
if not args.quiet:
sys.stderr.write("Filtered list with regexp: {!r}\n".format(args.regexp))
iterator = sorted(grep(args.regexp))
iterator = sorted(grep(args.regexp, include_links=args.include_links))
else:
iterator = sorted(comports())
iterator = sorted(comports(include_links=args.include_links))
# list them
for n, (port, desc, hwid) in enumerate(iterator, 1):
if args.n is None or args.n == n:

View File

@ -7,7 +7,13 @@
# (C) 2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import re
import glob
import os
import os.path
def numsplit(text):
@ -29,9 +35,9 @@ def numsplit(text):
class ListPortInfo(object):
"""Info collection base class for serial ports"""
def __init__(self, device=None):
def __init__(self, device, skip_link_detection=False):
self.device = device
self.name = None
self.name = os.path.basename(device)
self.description = 'n/a'
self.hwid = 'n/a'
# USB specific data
@ -42,11 +48,14 @@ class ListPortInfo(object):
self.manufacturer = None
self.product = None
self.interface = None
# special handling for links
if not skip_link_detection and device is not None and os.path.islink(device):
self.hwid = 'LINK={}'.format(os.path.realpath(device))
def usb_description(self):
"""return a short string to name the port based on USB info"""
if self.interface is not None:
return '{0} - {1}'.format(self.product, self.interface)
return '{} - {}'.format(self.product, self.interface)
elif self.product is not None:
return self.product
else:
@ -54,11 +63,11 @@ class ListPortInfo(object):
def usb_info(self):
"""return a string with USB related information about device"""
return 'USB VID:PID={0:04X}:{1:04X}{2}{3}'.format(
return 'USB VID:PID={:04X}:{:04X}{}{}'.format(
self.vid or 0,
self.pid or 0,
' SER={0}'.format(self.serial_number) if self.serial_number is not None else '',
' LOCATION={0}'.format(self.location) if self.location is not None else '')
' SER={}'.format(self.serial_number) if self.serial_number is not None else '',
' LOCATION={}'.format(self.location) if self.location is not None else '')
def apply_usb_info(self):
"""update description and hwid from USB data"""
@ -66,9 +75,16 @@ class ListPortInfo(object):
self.hwid = self.usb_info()
def __eq__(self, other):
return self.device == other.device
return isinstance(other, ListPortInfo) and self.device == other.device
def __hash__(self):
return hash(self.device)
def __lt__(self, other):
if not isinstance(other, ListPortInfo):
raise TypeError('unorderable types: {}() and {}()'.format(
type(self).__name__,
type(other).__name__))
return numsplit(self.device) < numsplit(other.device)
def __str__(self):
@ -83,7 +99,21 @@ class ListPortInfo(object):
elif index == 2:
return self.hwid
else:
raise IndexError('{0} > 2'.format(index))
raise IndexError('{} > 2'.format(index))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def list_links(devices):
"""\
search all /dev devices and look for symlinks to known ports already
listed in devices.
"""
links = []
for device in glob.glob('/dev/*'):
if os.path.islink(device) and os.path.realpath(device) in devices:
links.append(device)
return links
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test

View File

@ -8,6 +8,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import glob
import os
from serial.tools import list_ports_common
@ -18,30 +20,46 @@ class SysFS(list_ports_common.ListPortInfo):
def __init__(self, device):
super(SysFS, self).__init__(device)
self.name = os.path.basename(device)
# special handling for links
if device is not None and os.path.islink(device):
device = os.path.realpath(device)
is_link = True
else:
is_link = False
self.usb_device_path = None
if os.path.exists('/sys/class/tty/{0}/device'.format(self.name)):
self.device_path = os.path.realpath('/sys/class/tty/{0}/device'.format(self.name))
if os.path.exists('/sys/class/tty/{}/device'.format(self.name)):
self.device_path = os.path.realpath('/sys/class/tty/{}/device'.format(self.name))
self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem')))
else:
self.device_path = None
self.subsystem = None
# check device type
if self.subsystem == 'usb-serial':
self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path))
self.usb_interface_path = os.path.dirname(self.device_path)
elif self.subsystem == 'usb':
self.usb_device_path = os.path.dirname(self.device_path)
self.usb_interface_path = self.device_path
else:
self.usb_device_path = None
self.usb_interface_path = None
# fill-in info for USB devices
if self.usb_device_path is not None:
if self.usb_interface_path is not None:
self.usb_device_path = os.path.dirname(self.usb_interface_path)
try:
num_if = int(self.read_line(self.usb_device_path, 'bNumInterfaces'))
except ValueError:
num_if = 1
self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16)
self.pid = int(self.read_line(self.usb_device_path, 'idProduct'), 16)
self.serial_number = self.read_line(self.usb_device_path, 'serial')
if num_if > 1: # multi interface devices like FT4232
self.location = os.path.basename(self.usb_interface_path)
else:
self.location = os.path.basename(self.usb_device_path)
self.manufacturer = self.read_line(self.usb_device_path, 'manufacturer')
self.product = self.read_line(self.usb_device_path, 'product')
self.interface = self.read_line(self.device_path, 'interface')
self.interface = self.read_line(self.usb_interface_path, 'interface')
if self.subsystem in ('usb', 'usb-serial'):
self.apply_usb_info()
@ -53,6 +71,9 @@ class SysFS(list_ports_common.ListPortInfo):
self.description = self.name
self.hwid = os.path.basename(self.device_path)
if is_link:
self.hwid += ' LINK={}'.format(device)
def read_line(self, *args):
"""\
Helper function to read a single line from a file.
@ -67,12 +88,16 @@ class SysFS(list_ports_common.ListPortInfo):
return None
def comports():
def comports(include_links=False):
devices = glob.glob('/dev/ttyS*') # built-in serial ports
devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
devices.extend(glob.glob('/dev/ttyXRUSB*')) # xr-usb-serial port exar (DELL Edge 3001)
devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile
devices.extend(glob.glob('/dev/ttyAMA*')) # ARM internal port (raspi)
devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices
devices.extend(glob.glob('/dev/ttyAP*')) # Advantech multi-port serial controllers
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [info
for info in [SysFS(d) for d in devices]
if info.subsystem != "platform"] # hide non-present internal serial ports
@ -80,5 +105,5 @@ def comports():
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
for port, desc, hwid in sorted(comports()):
print("{}: {} [{}]".format(port, desc, hwid))
for info in sorted(comports()):
print("{0}: {0.subsystem}".format(info))

View File

@ -7,7 +7,7 @@
# and modifications by cliechti, hoihu, hardkrash
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2013-2015
# (C) 2013-2020
#
# SPDX-License-Identifier: BSD-3-Clause
@ -21,37 +21,54 @@
# Also see the 'IORegistryExplorer' for an idea of what we are actually searching
from __future__ import absolute_import
import ctypes
import ctypes.util
from serial.tools import list_ports_common
iokit = ctypes.cdll.LoadLibrary(ctypes.util.find_library('IOKit'))
cf = ctypes.cdll.LoadLibrary(ctypes.util.find_library('CoreFoundation'))
iokit = ctypes.cdll.LoadLibrary('/System/Library/Frameworks/IOKit.framework/IOKit')
cf = ctypes.cdll.LoadLibrary('/System/Library/Frameworks/CoreFoundation.framework/CoreFoundation')
kIOMasterPortDefault = ctypes.c_void_p.in_dll(iokit, "kIOMasterPortDefault")
# kIOMasterPortDefault is no longer exported in BigSur but no biggie, using NULL works just the same
kIOMasterPortDefault = 0 # WAS: ctypes.c_void_p.in_dll(iokit, "kIOMasterPortDefault")
kCFAllocatorDefault = ctypes.c_void_p.in_dll(cf, "kCFAllocatorDefault")
kCFStringEncodingMacRoman = 0
kCFStringEncodingUTF8 = 0x08000100
# defined in `IOKit/usb/USBSpec.h`
kUSBVendorString = 'USB Vendor Name'
kUSBSerialNumberString = 'USB Serial Number'
# `io_name_t` defined as `typedef char io_name_t[128];`
# in `device/device_types.h`
io_name_size = 128
# defined in `mach/kern_return.h`
KERN_SUCCESS = 0
# kern_return_t defined as `typedef int kern_return_t;` in `mach/i386/kern_return.h`
kern_return_t = ctypes.c_int
iokit.IOServiceMatching.restype = ctypes.c_void_p
iokit.IOServiceGetMatchingServices.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
iokit.IOServiceGetMatchingServices.restype = ctypes.c_void_p
iokit.IOServiceGetMatchingServices.restype = kern_return_t
iokit.IORegistryEntryGetParentEntry.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
iokit.IOServiceGetMatchingServices.restype = kern_return_t
iokit.IORegistryEntryCreateCFProperty.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint32]
iokit.IORegistryEntryCreateCFProperty.restype = ctypes.c_void_p
iokit.IORegistryEntryGetPath.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
iokit.IORegistryEntryGetPath.restype = ctypes.c_void_p
iokit.IORegistryEntryGetPath.restype = kern_return_t
iokit.IORegistryEntryGetName.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
iokit.IORegistryEntryGetName.restype = ctypes.c_void_p
iokit.IORegistryEntryGetName.restype = kern_return_t
iokit.IOObjectGetClass.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
iokit.IOObjectGetClass.restype = ctypes.c_void_p
iokit.IOObjectGetClass.restype = kern_return_t
iokit.IOObjectRelease.argtypes = [ctypes.c_void_p]
@ -62,6 +79,9 @@ cf.CFStringCreateWithCString.restype = ctypes.c_void_p
cf.CFStringGetCStringPtr.argtypes = [ctypes.c_void_p, ctypes.c_uint32]
cf.CFStringGetCStringPtr.restype = ctypes.c_char_p
cf.CFStringGetCString.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_long, ctypes.c_uint32]
cf.CFStringGetCString.restype = ctypes.c_bool
cf.CFNumberGetValue.argtypes = [ctypes.c_void_p, ctypes.c_uint32, ctypes.c_void_p]
cf.CFNumberGetValue.restype = ctypes.c_void_p
@ -86,8 +106,8 @@ def get_string_property(device_type, property):
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
property.encode("utf-8"),
kCFStringEncodingUTF8)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
@ -99,7 +119,12 @@ def get_string_property(device_type, property):
if CFContainer:
output = cf.CFStringGetCStringPtr(CFContainer, 0)
if output is not None:
output = output.decode('mac_roman')
output = output.decode('utf-8')
else:
buffer = ctypes.create_string_buffer(io_name_size);
success = cf.CFStringGetCString(CFContainer, ctypes.byref(buffer), io_name_size, kCFStringEncodingUTF8)
if success:
output = buffer.value.decode('utf-8')
cf.CFRelease(CFContainer)
return output
@ -116,8 +141,8 @@ def get_int_property(device_type, property, cf_number_type):
"""
key = cf.CFStringCreateWithCString(
kCFAllocatorDefault,
property.encode("mac_roman"),
kCFStringEncodingMacRoman)
property.encode("utf-8"),
kCFStringEncodingUTF8)
CFContainer = iokit.IORegistryEntryCreateCFProperty(
device_type,
@ -135,12 +160,19 @@ def get_int_property(device_type, property, cf_number_type):
return number.value
return None
def IORegistryEntryGetName(device):
pathname = ctypes.create_string_buffer(100) # TODO: Is this ok?
iokit.IOObjectGetClass(device, ctypes.byref(pathname))
return pathname.value
devicename = ctypes.create_string_buffer(io_name_size);
res = iokit.IORegistryEntryGetName(device, ctypes.byref(devicename))
if res != KERN_SUCCESS:
return None
# this works in python2 but may not be valid. Also I don't know if
# this encoding is guaranteed. It may be dependent on system locale.
return devicename.value.decode('utf-8')
def IOObjectGetClass(device):
classname = ctypes.create_string_buffer(io_name_size)
iokit.IOObjectGetClass(device, ctypes.byref(classname))
return classname.value
def GetParentDeviceByType(device, parent_type):
""" Find the first parent of a device that implements the parent_type
@ -148,15 +180,15 @@ def GetParentDeviceByType(device, parent_type):
@return Pointer to the parent type, or None if it was not found.
"""
# First, try to walk up the IOService tree to find a parent of this device that is a IOUSBDevice.
parent_type = parent_type.encode('mac_roman')
while IORegistryEntryGetName(device) != parent_type:
parent_type = parent_type.encode('utf-8')
while IOObjectGetClass(device) != parent_type:
parent = ctypes.c_void_p()
response = iokit.IORegistryEntryGetParentEntry(
device,
"IOService".encode("mac_roman"),
"IOService".encode("utf-8"),
ctypes.byref(parent))
# If we weren't able to find a parent for the device, we're done.
if response != 0:
if response != KERN_SUCCESS:
return None
device = parent
return device
@ -170,7 +202,7 @@ def GetIOServicesByType(service_type):
iokit.IOServiceGetMatchingServices(
kIOMasterPortDefault,
iokit.IOServiceMatching(service_type.encode('mac_roman')),
iokit.IOServiceMatching(service_type.encode('utf-8')),
ctypes.byref(serial_port_iterator))
services = []
@ -227,7 +259,8 @@ def search_for_locationID_in_interfaces(serial_interfaces, locationID):
return None
def comports():
def comports(include_links=False):
# XXX include_links is currently ignored. are links in /dev even supported here?
# Scan for all iokit serial ports
services = GetIOServicesByType('IOSerialBSDClient')
ports = []
@ -238,14 +271,21 @@ def comports():
if device:
info = list_ports_common.ListPortInfo(device)
# If the serial port is implemented by IOUSBDevice
# NOTE IOUSBDevice was deprecated as of 10.11 and finally on Apple Silicon
# devices has been completely removed. Thanks to @oskay for this patch.
usb_device = GetParentDeviceByType(service, "IOUSBHostDevice")
if not usb_device:
usb_device = GetParentDeviceByType(service, "IOUSBDevice")
if usb_device:
# fetch some useful informations from properties
info.vid = get_int_property(usb_device, "idVendor", kCFNumberSInt16Type)
info.pid = get_int_property(usb_device, "idProduct", kCFNumberSInt16Type)
info.serial_number = get_string_property(usb_device, "USB Serial Number")
info.product = get_string_property(usb_device, "USB Product Name") or 'n/a'
info.manufacturer = get_string_property(usb_device, "USB Vendor Name")
info.serial_number = get_string_property(usb_device, kUSBSerialNumberString)
# We know this is a usb device, so the
# IORegistryEntryName should always be aliased to the
# usb product name string descriptor.
info.product = IORegistryEntryGetName(usb_device) or 'n/a'
info.manufacturer = get_string_property(usb_device, kUSBVendorString)
locationID = get_int_property(usb_device, "locationID", kCFNumberSInt32Type)
info.location = location_to_string(locationID)
info.interface = search_for_locationID_in_interfaces(serial_interfaces, locationID)

View File

@ -16,6 +16,8 @@ As currently no method is known to get the second two strings easily, they are
currently just identical to the port name.
"""
from __future__ import absolute_import
import glob
import sys
import os
@ -34,48 +36,64 @@ elif plat == 'cygwin': # cygwin/win32
# cygwin accepts /dev/com* in many contexts
# (such as 'open' call, explicit 'ls'), but 'glob.glob'
# and bare 'ls' do not; so use /dev/ttyS* instead
def comports():
def comports(include_links=False):
devices = glob.glob('/dev/ttyS*')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:7] == 'openbsd': # OpenBSD
def comports():
def comports(include_links=False):
devices = glob.glob('/dev/cua*')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:3] == 'bsd' or plat[:7] == 'freebsd':
def comports():
def comports(include_links=False):
devices = glob.glob('/dev/cua*[!.init][!.lock]')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:6] == 'netbsd': # NetBSD
def comports():
def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/dty*')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:4] == 'irix': # IRIX
def comports():
def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/ttyf*')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:2] == 'hp': # HP-UX (not tested)
def comports():
def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*p0')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:5] == 'sunos': # Solaris/SunOS
def comports():
def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*c')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:3] == 'aix': # AIX
def comports():
def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*')
if include_links:
devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
else:

View File

@ -8,6 +8,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
# pylint: disable=invalid-name,too-few-public-methods
import re
import ctypes
@ -116,11 +118,25 @@ RegQueryValueEx = advapi32.RegQueryValueExW
RegQueryValueEx.argtypes = [HKEY, LPCTSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
RegQueryValueEx.restype = LONG
cfgmgr32 = ctypes.windll.LoadLibrary("Cfgmgr32")
CM_Get_Parent = cfgmgr32.CM_Get_Parent
CM_Get_Parent.argtypes = [PDWORD, DWORD, ULONG]
CM_Get_Parent.restype = LONG
CM_Get_Device_IDW = cfgmgr32.CM_Get_Device_IDW
CM_Get_Device_IDW.argtypes = [DWORD, PTSTR, ULONG, ULONG]
CM_Get_Device_IDW.restype = LONG
CM_MapCrToWin32Err = cfgmgr32.CM_MapCrToWin32Err
CM_MapCrToWin32Err.argtypes = [DWORD, DWORD]
CM_MapCrToWin32Err.restype = DWORD
DIGCF_PRESENT = 2
DIGCF_DEVICEINTERFACE = 16
INVALID_HANDLE_VALUE = 0
ERROR_INSUFFICIENT_BUFFER = 122
ERROR_NOT_FOUND = 1168
SPDRP_HARDWAREID = 1
SPDRP_FRIENDLYNAME = 12
SPDRP_LOCATION_PATHS = 35
@ -130,19 +146,120 @@ DIREG_DEV = 0x00000001
KEY_READ = 0x20019
MAX_USB_DEVICE_TREE_TRAVERSAL_DEPTH = 5
def get_parent_serial_number(child_devinst, child_vid, child_pid, depth=0, last_serial_number=None):
""" Get the serial number of the parent of a device.
Args:
child_devinst: The device instance handle to get the parent serial number of.
child_vid: The vendor ID of the child device.
child_pid: The product ID of the child device.
depth: The current iteration depth of the USB device tree.
"""
# If the traversal depth is beyond the max, abandon attempting to find the serial number.
if depth > MAX_USB_DEVICE_TREE_TRAVERSAL_DEPTH:
return '' if not last_serial_number else last_serial_number
# Get the parent device instance.
devinst = DWORD()
ret = CM_Get_Parent(ctypes.byref(devinst), child_devinst, 0)
if ret:
win_error = CM_MapCrToWin32Err(DWORD(ret), DWORD(0))
# If there is no parent available, the child was the root device. We cannot traverse
# further.
if win_error == ERROR_NOT_FOUND:
return '' if not last_serial_number else last_serial_number
raise ctypes.WinError(win_error)
# Get the ID of the parent device and parse it for vendor ID, product ID, and serial number.
parentHardwareID = ctypes.create_unicode_buffer(250)
ret = CM_Get_Device_IDW(
devinst,
parentHardwareID,
ctypes.sizeof(parentHardwareID) - 1,
0)
if ret:
raise ctypes.WinError(CM_MapCrToWin32Err(DWORD(ret), DWORD(0)))
parentHardwareID_str = parentHardwareID.value
m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(.*))?',
parentHardwareID_str,
re.I)
# return early if we have no matches (likely malformed serial, traversed too far)
if not m:
return '' if not last_serial_number else last_serial_number
vid = None
pid = None
serial_number = None
if m.group(1):
vid = int(m.group(1), 16)
if m.group(3):
pid = int(m.group(3), 16)
if m.group(7):
serial_number = m.group(7)
# store what we found as a fallback for malformed serial values up the chain
found_serial_number = serial_number
# Check that the USB serial number only contains alpha-numeric characters. It may be a windows
# device ID (ephemeral ID).
if serial_number and not re.match(r'^\w+$', serial_number):
serial_number = None
if not vid or not pid:
# If pid and vid are not available at this device level, continue to the parent.
return get_parent_serial_number(devinst, child_vid, child_pid, depth + 1, found_serial_number)
if pid != child_pid or vid != child_vid:
# If the VID or PID has changed, we are no longer looking at the same physical device. The
# serial number is unknown.
return '' if not last_serial_number else last_serial_number
# In this case, the vid and pid of the parent device are identical to the child. However, if
# there still isn't a serial number available, continue to the next parent.
if not serial_number:
return get_parent_serial_number(devinst, child_vid, child_pid, depth + 1, found_serial_number)
# Finally, the VID and PID are identical to the child and a serial number is present, so return
# it.
return serial_number
def iterate_comports():
"""Return a generator that yields descriptions for serial ports"""
GUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough...
guids_size = DWORD()
PortsGUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough...
ports_guids_size = DWORD()
if not SetupDiClassGuidsFromName(
"Ports",
GUIDs,
ctypes.sizeof(GUIDs),
ctypes.byref(guids_size)):
PortsGUIDs,
ctypes.sizeof(PortsGUIDs),
ctypes.byref(ports_guids_size)):
raise ctypes.WinError()
ModemsGUIDs = (GUID * 8)() # so far only seen one used, so hope 8 are enough...
modems_guids_size = DWORD()
if not SetupDiClassGuidsFromName(
"Modem",
ModemsGUIDs,
ctypes.sizeof(ModemsGUIDs),
ctypes.byref(modems_guids_size)):
raise ctypes.WinError()
GUIDs = PortsGUIDs[:ports_guids_size.value] + ModemsGUIDs[:modems_guids_size.value]
# repeat for all possible GUIDs
for index in range(guids_size.value):
for index in range(len(GUIDs)):
bInterfaceNumber = None
g_hdi = SetupDiGetClassDevs(
ctypes.byref(GUIDs[index]),
None,
@ -205,18 +322,26 @@ def iterate_comports():
# stringify
szHardwareID_str = szHardwareID.value
info = list_ports_common.ListPortInfo(port_name_buffer.value)
info = list_ports_common.ListPortInfo(port_name_buffer.value, skip_link_detection=True)
# in case of USB, make a more readable string, similar to that form
# that we also generate on other platforms
if szHardwareID_str.startswith('USB'):
m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I)
m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(.*))?', szHardwareID_str, re.I)
if m:
info.vid = int(m.group(1), 16)
if m.group(3):
info.pid = int(m.group(3), 16)
if m.group(5):
info.serial_number = m.group(5)
bInterfaceNumber = int(m.group(5))
# Check that the USB serial number only contains alpha-numeric characters. It
# may be a windows device ID (ephemeral ID) for composite devices.
if m.group(7) and re.match(r'^\w+$', m.group(7)):
info.serial_number = m.group(7)
else:
info.serial_number = get_parent_serial_number(devinfo.DevInst, info.vid, info.pid)
# calculate a location string
loc_path_str = ctypes.create_unicode_buffer(250)
if SetupDiGetDeviceRegistryProperty(
@ -238,6 +363,10 @@ def iterate_comports():
else:
location.append('-')
location.append(g.group(2))
if bInterfaceNumber is not None:
location.append(':{}.{}'.format(
'x', # XXX how to determine correct bConfigurationValue?
bInterfaceNumber))
if location:
info.location = ''.join(location)
info.hwid = info.usb_info()
@ -287,7 +416,7 @@ def iterate_comports():
SetupDiDestroyDeviceInfoList(g_hdi)
def comports():
def comports(include_links=False):
"""Return a list of info objects about serial ports"""
return list(iterate_comports())

View File

@ -3,10 +3,12 @@
# Very simple serial terminal
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C)2002-2015 Chris Liechti <cliechti@gmx.net>
# (C)2002-2020 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
import codecs
import os
import sys
@ -86,6 +88,7 @@ class ConsoleBase(object):
if os.name == 'nt': # noqa
import msvcrt
import ctypes
import platform
class Out(object):
"""file-like wrapper that uses os.write"""
@ -100,12 +103,52 @@ if os.name == 'nt': # noqa
os.write(self.fd, s)
class Console(ConsoleBase):
fncodes = {
';': '\1bOP', # F1
'<': '\1bOQ', # F2
'=': '\1bOR', # F3
'>': '\1bOS', # F4
'?': '\1b[15~', # F5
'@': '\1b[17~', # F6
'A': '\1b[18~', # F7
'B': '\1b[19~', # F8
'C': '\1b[20~', # F9
'D': '\1b[21~', # F10
}
navcodes = {
'H': '\x1b[A', # UP
'P': '\x1b[B', # DOWN
'K': '\x1b[D', # LEFT
'M': '\x1b[C', # RIGHT
'G': '\x1b[H', # HOME
'O': '\x1b[F', # END
'R': '\x1b[2~', # INSERT
'S': '\x1b[3~', # DELETE
'I': '\x1b[5~', # PGUP
'Q': '\x1b[6~', # PGDN
}
def __init__(self):
super(Console, self).__init__()
self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP()
self._saved_icp = ctypes.windll.kernel32.GetConsoleCP()
ctypes.windll.kernel32.SetConsoleOutputCP(65001)
ctypes.windll.kernel32.SetConsoleCP(65001)
# ANSI handling available through SetConsoleMode since Windows 10 v1511
# https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1
if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586:
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
import ctypes.wintypes as wintypes
if not hasattr(wintypes, 'LPDWORD'): # PY2
wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
GetStdHandle = ctypes.windll.kernel32.GetStdHandle
mode = wintypes.DWORD()
GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode))
if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
self._saved_cm = mode
self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace')
# the change of the code page is not propagated to Python, manually fix it
sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace')
@ -115,14 +158,25 @@ if os.name == 'nt': # noqa
def __del__(self):
ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp)
ctypes.windll.kernel32.SetConsoleCP(self._saved_icp)
try:
ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm)
except AttributeError: # in case no _saved_cm
pass
def getkey(self):
while True:
z = msvcrt.getwch()
if z == unichr(13):
return unichr(10)
elif z in (unichr(0), unichr(0x0e)): # functions keys, ignore
msvcrt.getwch()
elif z is unichr(0) or z is unichr(0xe0):
try:
code = msvcrt.getwch()
if z is unichr(0):
return self.fncodes[code]
else:
return self.navcodes[code]
except KeyError:
pass
else:
return z
@ -135,15 +189,12 @@ if os.name == 'nt': # noqa
elif os.name == 'posix':
import atexit
import termios
import select
import fcntl
class Console(ConsoleBase):
def __init__(self):
super(Console, self).__init__()
self.fd = sys.stdin.fileno()
# an additional pipe is used in getkey, so that the cancel method
# can abort the waiting getkey method
self.pipe_r, self.pipe_w = os.pipe()
self.old = termios.tcgetattr(self.fd)
atexit.register(self.cleanup)
if sys.version_info < (3, 0):
@ -159,17 +210,13 @@ elif os.name == 'posix':
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def getkey(self):
ready, _, _ = select.select([self.enc_stdin, self.pipe_r], [], [], None)
if self.pipe_r in ready:
os.read(self.pipe_r, 1)
return
c = self.enc_stdin.read(1)
if c == unichr(0x7f):
c = unichr(8) # map the BS key (which yields DEL) to backspace
return c
def cancel(self):
os.write(self.pipe_w, b"x")
fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0')
def cleanup(self):
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
@ -282,12 +329,12 @@ class DebugIO(Transform):
"""Print what is sent and received"""
def rx(self, text):
sys.stderr.write(' [RX:{}] '.format(repr(text)))
sys.stderr.write(' [RX:{!r}] '.format(text))
sys.stderr.flush()
return text
def tx(self, text):
sys.stderr.write(' [TX:{}] '.format(repr(text)))
sys.stderr.write(' [TX:{!r}] '.format(text))
sys.stderr.flush()
return text
@ -322,7 +369,7 @@ def ask_for_port():
sys.stderr.write('\n--- Available ports:\n')
ports = []
for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc))
sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
ports.append(port)
while True:
port = raw_input('--- Enter port index or full name: ')
@ -354,8 +401,8 @@ class Miniterm(object):
self.eol = eol
self.filters = filters
self.update_transformations()
self.exit_character = 0x1d # GS/CTRL+]
self.menu_character = 0x14 # Menu: CTRL+T
self.exit_character = unichr(0x1d) # GS/CTRL+]
self.menu_character = unichr(0x14) # Menu: CTRL+T
self.alive = None
self._reader_alive = None
self.receiver_thread = None
@ -509,25 +556,7 @@ class Miniterm(object):
if self.echo:
self.console.write(c)
elif c == '\x15': # CTRL+U -> upload file
sys.stderr.write('\n--- File to upload: ')
sys.stderr.flush()
with self.console:
filename = sys.stdin.readline().rstrip('\r\n')
if filename:
try:
with open(filename, 'rb') as f:
sys.stderr.write('--- Sending file {} ---\n'.format(filename))
while True:
block = f.read(1024)
if not block:
break
self.serial.write(block)
# Wait for output buffer to drain.
self.serial.flush()
sys.stderr.write('.') # Progress indicator.
sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
except IOError as e:
sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
self.upload_file()
elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
sys.stderr.write(self.get_help_text())
elif c == '\x12': # CTRL+R -> Toggle RTS
@ -543,22 +572,7 @@ class Miniterm(object):
self.echo = not self.echo
sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
elif c == '\x06': # CTRL+F -> edit filters
sys.stderr.write('\n--- Available Filters:\n')
sys.stderr.write('\n'.join(
'--- {:<10} = {.__doc__}'.format(k, v)
for k, v in sorted(TRANSFORMATIONS.items())))
sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
with self.console:
new_filters = sys.stdin.readline().lower().split()
if new_filters:
for f in new_filters:
if f not in TRANSFORMATIONS:
sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
break
else:
self.filters = new_filters
self.update_transformations()
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
self.change_filter()
elif c == '\x0c': # CTRL+L -> EOL mode
modes = list(EOL_TRANSFORMATIONS) # keys
eol = modes.index(self.eol) + 1
@ -568,63 +582,17 @@ class Miniterm(object):
sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
self.update_transformations()
elif c == '\x01': # CTRL+A -> set encoding
sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
with self.console:
new_encoding = sys.stdin.readline().strip()
if new_encoding:
try:
codecs.lookup(new_encoding)
except LookupError:
sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
else:
self.set_rx_encoding(new_encoding)
self.set_tx_encoding(new_encoding)
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
self.change_encoding()
elif c == '\x09': # CTRL+I -> info
self.dump_port_settings()
#~ elif c == '\x01': # CTRL+A -> cycle escape mode
#~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
elif c in 'pP': # P -> change port
with self.console:
try:
port = ask_for_port()
except KeyboardInterrupt:
port = None
if port and port != self.serial.port:
# reader thread needs to be shut down
self._stop_reader()
# save settings
settings = self.serial.getSettingsDict()
try:
new_serial = serial.serial_for_url(port, do_not_open=True)
# restore settings and open
new_serial.applySettingsDict(settings)
new_serial.rts = self.serial.rts
new_serial.dtr = self.serial.dtr
new_serial.open()
new_serial.break_condition = self.serial.break_condition
except Exception as e:
sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
new_serial.close()
else:
self.serial.close()
self.serial = new_serial
sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
# and restart the reader thread
self._start_reader()
self.change_port()
elif c in 'zZ': # S -> suspend / open port temporarily
self.suspend_port()
elif c in 'bB': # B -> change baudrate
sys.stderr.write('\n--- Baudrate: ')
sys.stderr.flush()
with self.console:
backup = self.serial.baudrate
try:
self.serial.baudrate = int(sys.stdin.readline().strip())
except ValueError as e:
sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
self.serial.baudrate = backup
else:
self.dump_port_settings()
self.change_baudrate()
elif c == '8': # 8 -> change to 8 bits
self.serial.bytesize = serial.EIGHTBITS
self.dump_port_settings()
@ -661,16 +629,150 @@ class Miniterm(object):
elif c in 'rR': # R -> change hardware flow control
self.serial.rtscts = (c == 'R')
self.dump_port_settings()
elif c in 'qQ':
self.stop() # Q -> exit app
else:
sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
def upload_file(self):
"""Ask user for filenname and send its contents"""
sys.stderr.write('\n--- File to upload: ')
sys.stderr.flush()
with self.console:
filename = sys.stdin.readline().rstrip('\r\n')
if filename:
try:
with open(filename, 'rb') as f:
sys.stderr.write('--- Sending file {} ---\n'.format(filename))
while True:
block = f.read(1024)
if not block:
break
self.serial.write(block)
# Wait for output buffer to drain.
self.serial.flush()
sys.stderr.write('.') # Progress indicator.
sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
except IOError as e:
sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
def change_filter(self):
"""change the i/o transformations"""
sys.stderr.write('\n--- Available Filters:\n')
sys.stderr.write('\n'.join(
'--- {:<10} = {.__doc__}'.format(k, v)
for k, v in sorted(TRANSFORMATIONS.items())))
sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
with self.console:
new_filters = sys.stdin.readline().lower().split()
if new_filters:
for f in new_filters:
if f not in TRANSFORMATIONS:
sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
break
else:
self.filters = new_filters
self.update_transformations()
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
def change_encoding(self):
"""change encoding on the serial port"""
sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
with self.console:
new_encoding = sys.stdin.readline().strip()
if new_encoding:
try:
codecs.lookup(new_encoding)
except LookupError:
sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
else:
self.set_rx_encoding(new_encoding)
self.set_tx_encoding(new_encoding)
sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
def change_baudrate(self):
"""change the baudrate"""
sys.stderr.write('\n--- Baudrate: ')
sys.stderr.flush()
with self.console:
backup = self.serial.baudrate
try:
self.serial.baudrate = int(sys.stdin.readline().strip())
except ValueError as e:
sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
self.serial.baudrate = backup
else:
self.dump_port_settings()
def change_port(self):
"""Have a conversation with the user to change the serial port"""
with self.console:
try:
port = ask_for_port()
except KeyboardInterrupt:
port = None
if port and port != self.serial.port:
# reader thread needs to be shut down
self._stop_reader()
# save settings
settings = self.serial.getSettingsDict()
try:
new_serial = serial.serial_for_url(port, do_not_open=True)
# restore settings and open
new_serial.applySettingsDict(settings)
new_serial.rts = self.serial.rts
new_serial.dtr = self.serial.dtr
new_serial.open()
new_serial.break_condition = self.serial.break_condition
except Exception as e:
sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
new_serial.close()
else:
self.serial.close()
self.serial = new_serial
sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
# and restart the reader thread
self._start_reader()
def suspend_port(self):
"""\
open port temporarily, allow reconnect, exit and port change to get
out of the loop
"""
# reader thread needs to be shut down
self._stop_reader()
self.serial.close()
sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
do_change_port = False
while not self.serial.is_open:
sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
exit=key_description(self.exit_character)))
k = self.console.getkey()
if k == self.exit_character:
self.stop() # exit app
break
elif k in 'pP':
do_change_port = True
break
try:
self.serial.open()
except Exception as e:
sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
if do_change_port:
self.change_port()
else:
# and restart the reader thread
self._start_reader()
sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
def get_help_text(self):
"""return the help text"""
# help text, starts with blank line!
return """
--- pySerial ({version}) - miniterm - help
---
--- {exit:8} Exit program
--- {exit:8} Exit program (alias {menu} Q)
--- {menu:8} Menu escape key, followed by:
--- Menu keys:
--- {menu:7} Send the menu character itself to remote
@ -714,123 +816,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
import argparse
parser = argparse.ArgumentParser(
description="Miniterm - A simple terminal program for the serial port.")
description='Miniterm - A simple terminal program for the serial port.')
parser.add_argument(
"port",
'port',
nargs='?',
help="serial port name ('-' to show port list)",
help='serial port name ("-" to show port list)',
default=default_port)
parser.add_argument(
"baudrate",
'baudrate',
nargs='?',
type=int,
help="set baud rate, default: %(default)s",
help='set baud rate, default: %(default)s',
default=default_baudrate)
group = parser.add_argument_group("port settings")
group = parser.add_argument_group('port settings')
group.add_argument(
"--parity",
'--parity',
choices=['N', 'E', 'O', 'S', 'M'],
type=lambda c: c.upper(),
help="set parity, one of {N E O S M}, default: N",
help='set parity, one of {N E O S M}, default: N',
default='N')
group.add_argument(
"--rtscts",
action="store_true",
help="enable RTS/CTS flow control (default off)",
'--rtscts',
action='store_true',
help='enable RTS/CTS flow control (default off)',
default=False)
group.add_argument(
"--xonxoff",
action="store_true",
help="enable software flow control (default off)",
'--xonxoff',
action='store_true',
help='enable software flow control (default off)',
default=False)
group.add_argument(
"--rts",
'--rts',
type=int,
help="set initial RTS line state (possible values: 0, 1)",
help='set initial RTS line state (possible values: 0, 1)',
default=default_rts)
group.add_argument(
"--dtr",
'--dtr',
type=int,
help="set initial DTR line state (possible values: 0, 1)",
help='set initial DTR line state (possible values: 0, 1)',
default=default_dtr)
group.add_argument(
"--ask",
action="store_true",
help="ask again for port when open fails",
default=False)
group = parser.add_argument_group("data handling")
'--non-exclusive',
dest='exclusive',
action='store_false',
help='disable locking for native ports',
default=True)
group.add_argument(
"-e", "--echo",
action="store_true",
help="enable local echo (default off)",
'--ask',
action='store_true',
help='ask again for port when open fails',
default=False)
group = parser.add_argument_group('data handling')
group.add_argument(
'-e', '--echo',
action='store_true',
help='enable local echo (default off)',
default=False)
group.add_argument(
"--encoding",
dest="serial_port_encoding",
metavar="CODEC",
help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s",
'--encoding',
dest='serial_port_encoding',
metavar='CODEC',
help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
default='UTF-8')
group.add_argument(
"-f", "--filter",
action="append",
metavar="NAME",
help="add text transformation",
'-f', '--filter',
action='append',
metavar='NAME',
help='add text transformation',
default=[])
group.add_argument(
"--eol",
'--eol',
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
help="end of line mode",
help='end of line mode',
default='CRLF')
group.add_argument(
"--raw",
action="store_true",
help="Do no apply any encodings/transformations",
'--raw',
action='store_true',
help='Do no apply any encodings/transformations',
default=False)
group = parser.add_argument_group("hotkeys")
group = parser.add_argument_group('hotkeys')
group.add_argument(
"--exit-char",
'--exit-char',
type=int,
metavar='NUM',
help="Unicode of special character that is used to exit the application, default: %(default)s",
help='Unicode of special character that is used to exit the application, default: %(default)s',
default=0x1d) # GS/CTRL+]
group.add_argument(
"--menu-char",
'--menu-char',
type=int,
metavar='NUM',
help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s",
help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
default=0x14) # Menu: CTRL+T
group = parser.add_argument_group("diagnostics")
group = parser.add_argument_group('diagnostics')
group.add_argument(
"-q", "--quiet",
action="store_true",
help="suppress non-error messages",
'-q', '--quiet',
action='store_true',
help='suppress non-error messages',
default=False)
group.add_argument(
"--develop",
action="store_true",
help="show Python traceback on error",
'--develop',
action='store_true',
help='show Python traceback on error',
default=False)
args = parser.parse_args()
@ -883,9 +992,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
serial_instance.rts = args.rts
if isinstance(serial_instance, serial.Serial):
serial_instance.exclusive = args.exclusive
serial_instance.open()
except serial.SerialException as e:
sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e))
sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
if args.develop:
raise
if not args.ask:
@ -921,7 +1033,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
except KeyboardInterrupt:
pass
if not args.quiet:
sys.stderr.write("\n--- exit ---\n")
sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()

View File

@ -16,6 +16,8 @@
# use poll based implementation on Posix (Linux):
# python -m serial.tools.miniterm alt:///dev/ttyUSB0?class=PosixPollSerial
from __future__ import absolute_import
try:
import urlparse
except ImportError:

View File

@ -20,6 +20,8 @@
# n=<N> pick the N'th entry instead of the first one (numbering starts at 1)
# skip_busy tries to open port to check if it is busy, fails on posix as ports are not locked!
from __future__ import absolute_import
import serial
import serial.tools.list_ports

View File

@ -6,13 +6,15 @@
# and it was so easy to implement ;-)
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
#
# URL format: loop://[option[/option...]]
# options:
# - "debug" print diagnostic messages
from __future__ import absolute_import
import logging
import numbers
import time
@ -25,7 +27,7 @@ try:
except ImportError:
import Queue as queue
from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, writeTimeoutError, portNotOpenError
from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError
# map log level names to constants. used in from_url()
LOGGER_LEVELS = {
@ -125,7 +127,7 @@ class Serial(SerialBase):
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
# attention the logged value can differ from return value in
# threaded environments...
@ -139,7 +141,7 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self._timeout is not None and self._timeout != 0:
timeout = time.time() + self._timeout
else:
@ -179,7 +181,7 @@ class Serial(SerialBase):
"""
self._cancel_write = False
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
data = to_bytes(data)
# calculate aprox time that would be used to send the data
time_used_to_send = 10.0 * len(data) / self._baudrate
@ -193,7 +195,7 @@ class Serial(SerialBase):
time_left -= 0.5
if self._cancel_write:
return 0 # XXX
raise writeTimeoutError
raise SerialTimeoutException('Write timeout')
for byte in iterbytes(data):
self.queue.put(byte, timeout=self._write_timeout)
return len(data)
@ -201,7 +203,7 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('reset_input_buffer()')
try:
@ -216,7 +218,7 @@ class Serial(SerialBase):
discarding all that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('reset_output_buffer()')
try:
@ -225,6 +227,17 @@ class Serial(SerialBase):
except queue.Empty:
pass
@property
def out_waiting(self):
"""Return how many bytes the in the outgoing buffer"""
if not self.is_open:
raise PortNotOpenError()
if self.logger:
# attention the logged value can differ from return value in
# threaded environments...
self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize()))
return self.queue.qsize()
def _update_break_state(self):
"""\
Set break: Controls TXD. When active, to transmitting is
@ -247,7 +260,7 @@ class Serial(SerialBase):
def cts(self):
"""Read terminal status line: Clear To Send"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state))
return self._rts_state
@ -263,7 +276,7 @@ class Serial(SerialBase):
def ri(self):
"""Read terminal status line: Ring Indicator"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for RI')
return False
@ -272,7 +285,7 @@ class Serial(SerialBase):
def cd(self):
"""Read terminal status line: Carrier Detect"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for CD')
return True

View File

@ -1,10 +1,12 @@
#! python
#
# This is a thin wrapper to load the rfc2271 implementation.
# This is a thin wrapper to load the rfc2217 implementation.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2011 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import absolute_import
from serial.rfc2217 import Serial # noqa

View File

@ -16,6 +16,8 @@
# options:
# - "debug" print diagnostic messages
from __future__ import absolute_import
import errno
import logging
import select
@ -26,7 +28,8 @@ try:
except ImportError:
import urllib.parse as urlparse
from serial.serialutil import SerialBase, SerialException, portNotOpenError, to_bytes
from serial.serialutil import SerialBase, SerialException, to_bytes, \
PortNotOpenError, SerialTimeoutException, Timeout
# map log level names to constants. used in from_url()
LOGGER_LEVELS = {
@ -61,6 +64,8 @@ class Serial(SerialBase):
except Exception as msg:
self._socket = None
raise SerialException("Could not open port {}: {}".format(self.portstr, msg))
# after connecting, switch to non-blocking, we're using select
self._socket.setblocking(False)
# not that there is anything to configure...
self._reconfigure_port()
@ -131,7 +136,7 @@ class Serial(SerialBase):
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
# Poll the socket to see if it is ready for reading.
# If ready, at least one byte will be to read.
lr, lw, lx = select.select([self._socket], [], [], 0)
@ -147,13 +152,12 @@ class Serial(SerialBase):
until the requested number of bytes is read.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
read = bytearray()
timeout = self._timeout
timeout = Timeout(self._timeout)
while len(read) < size:
try:
start_time = time.time()
ready, _, _ = select.select([self._socket], [], [], timeout)
ready, _, _ = select.select([self._socket], [], [], timeout.time_left())
# If select was used with a timeout, and the timeout occurs, it
# returns with empty lists -> thus abort read operation.
# For timeout == 0 (non-blocking operation) also abort when
@ -166,27 +170,20 @@ class Serial(SerialBase):
if not buf:
raise SerialException('socket disconnected')
read.extend(buf)
if timeout is not None:
timeout -= time.time() - start_time
if timeout <= 0:
break
except socket.timeout:
# timeout is used for write support, just go reading again
pass
except socket.error as e:
# connection fails -> terminate loop
raise SerialException('connection failed ({})'.format(e))
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore EAGAIN errors. all other errors are shown
if e.errno != errno.EAGAIN:
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
except select.error as e:
except (select.error, socket.error) as e:
# this is for Python 2.x
# ignore EAGAIN errors. all other errors are shown
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] != errno.EAGAIN:
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
if timeout.expired():
break
return bytes(read)
def write(self, data):
@ -196,20 +193,76 @@ class Serial(SerialBase):
closed.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
d = to_bytes(data)
tx_len = length = len(d)
timeout = Timeout(self._write_timeout)
while tx_len > 0:
try:
self._socket.sendall(to_bytes(data))
except socket.error as e:
# XXX what exception if socket connection fails
raise SerialException("socket connection failed: {}".format(e))
return len(data)
n = self._socket.send(d)
if timeout.is_non_blocking:
# Zero timeout indicates non-blocking - simply return the
# number of bytes of data actually written
return n
elif not timeout.is_infinite:
# when timeout is set, use select to wait for being ready
# with the time left as timeout
if timeout.expired():
raise SerialTimeoutException('Write timeout')
_, ready, _ = select.select([], [self._socket], [], timeout.time_left())
if not ready:
raise SerialTimeoutException('Write timeout')
else:
assert timeout.time_left() is None
# wait for write operation
_, ready, _ = select.select([], [self._socket], [], None)
if not ready:
raise SerialException('write failed (select)')
d = d[n:]
tx_len -= n
except SerialException:
raise
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('write failed: {}'.format(e))
except select.error as e:
# this is for Python 2.x
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('write failed: {}'.format(e))
if not timeout.is_non_blocking and timeout.expired():
raise SerialTimeoutException('Write timeout')
return length - len(d)
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
if not self.is_open:
raise portNotOpenError
if self.logger:
self.logger.info('ignored reset_input_buffer')
raise PortNotOpenError()
# just use recv to remove input, while there is some
ready = True
while ready:
ready, _, _ = select.select([self._socket], [], [], 0)
try:
if ready:
ready = self._socket.recv(4096)
except OSError as e:
# this is for Python 3.x where select.error is a subclass of
# OSError ignore BlockingIOErrors and EINTR. other errors are shown
# https://www.python.org/dev/peps/pep-0475.
if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
except (select.error, socket.error) as e:
# this is for Python 2.x
# ignore BlockingIOErrors and EINTR. all errors are shown
# see also http://www.python.org/dev/peps/pep-3151/#select
if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
raise SerialException('read failed: {}'.format(e))
def reset_output_buffer(self):
"""\
@ -217,7 +270,7 @@ class Serial(SerialBase):
discarding all that is in the buffer.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('ignored reset_output_buffer')
@ -227,7 +280,7 @@ class Serial(SerialBase):
duration.
"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('ignored send_break({!r})'.format(duration))
@ -251,7 +304,7 @@ class Serial(SerialBase):
def cts(self):
"""Read terminal status line: Clear To Send"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for cts')
return True
@ -260,7 +313,7 @@ class Serial(SerialBase):
def dsr(self):
"""Read terminal status line: Data Set Ready"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for dsr')
return True
@ -269,7 +322,7 @@ class Serial(SerialBase):
def ri(self):
"""Read terminal status line: Ring Indicator"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for ri')
return False
@ -278,7 +331,7 @@ class Serial(SerialBase):
def cd(self):
"""Read terminal status line: Carrier Detect"""
if not self.is_open:
raise portNotOpenError
raise PortNotOpenError()
if self.logger:
self.logger.info('returning dummy for cd)')
return True

View File

@ -20,10 +20,13 @@
# redirect output to an other terminal window on Posix (Linux):
# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
from __future__ import absolute_import
import sys
import time
import serial
from serial.serialutil import to_bytes
try:
import urlparse
@ -198,6 +201,7 @@ class Serial(serial.Serial):
return ''.join([parts.netloc, parts.path])
def write(self, tx):
tx = to_bytes(tx)
self.formatter.tx(tx)
return super(Serial, self).write(tx)

View File

@ -9,6 +9,8 @@
# pylint: disable=invalid-name,too-few-public-methods,protected-access,too-many-instance-attributes
from __future__ import absolute_import
from ctypes import c_ulong, c_void_p, c_int64, c_char, \
WinDLL, sizeof, Structure, Union, POINTER
from ctypes.wintypes import HANDLE
@ -179,6 +181,10 @@ WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject
WaitForSingleObject.restype = DWORD
WaitForSingleObject.argtypes = [HANDLE, DWORD]
WaitCommEvent = _stdcall_libraries['kernel32'].WaitCommEvent
WaitCommEvent.restype = BOOL
WaitCommEvent.argtypes = [HANDLE, LPDWORD, LPOVERLAPPED]
CancelIoEx = _stdcall_libraries['kernel32'].CancelIoEx
CancelIoEx.restype = BOOL
CancelIoEx.argtypes = [HANDLE, LPOVERLAPPED]
@ -245,6 +251,12 @@ EV_BREAK = 64 # Variable c_int
PURGE_RXCLEAR = 8 # Variable c_int
INFINITE = 0xFFFFFFFF
CE_RXOVER = 0x0001
CE_OVERRUN = 0x0002
CE_RXPARITY = 0x0004
CE_FRAME = 0x0008
CE_BREAK = 0x0010
class N11_OVERLAPPED4DOLLAR_48E(Union):
pass