199 lines
6.5 KiB
Python
199 lines
6.5 KiB
Python
# https://tlcdcemu.sourceforge.net/python.html
|
|
# https://sourceforge.net/projects/tlcdcemu/files/tlcdcpy/
|
|
|
|
from serial import Serial, EIGHTBITS, PARITY_EVEN, STOPBITS_ONE
|
|
import requests
|
|
import time
|
|
|
|
def handle_button_press(btn):
|
|
requests.get("http://127.0.0.1:5959/btn", params={
|
|
"btn": btn
|
|
})
|
|
|
|
NOISE = 30
|
|
ERROR = 20
|
|
|
|
def bcd(number):
|
|
dec = number // 10
|
|
unit = number % 10
|
|
return bytes([dec * 16 + unit])
|
|
|
|
class cdc(Serial):
|
|
_commands = {
|
|
b'\x17\x01': ('NEXT', 0, 1),
|
|
b'\x22\x01\x02': ('PREV', 0, -1),
|
|
b'\x20\x0a': ('FAST_FORW', 0, 0),
|
|
b'\x13': ('PLAY', 0, 0),
|
|
b'\x86': ('RESUME', 0, 0),
|
|
b'\x19': ('STOP', 0, 0),
|
|
b'\x1c': ('PAUSE', 0, 0),
|
|
b'\x93': ('HU_ON', 0, 0),
|
|
b'\x21\x0a': ('FAST_BACK', 0, 0),
|
|
b'\x26\x01': ('CD_1', 1, 0),
|
|
b'\x26\x02': ('CD_2', 2, 0),
|
|
b'\x26\x03': ('CD_3', 3, 0),
|
|
b'\x26\x04': ('CD_4', 4, 0),
|
|
b'\x26\x05': ('CD_5', 5, 0),
|
|
b'\x26\x06': ('CD_6', 6, 0),
|
|
b'\x24': ('STALK', 0, 0),
|
|
b'\x27\x07\x0a': ('RAND_ON', 0, 0),
|
|
b'\x27\x03\x0a': ('RAND_OFF', 0, 0),
|
|
}
|
|
|
|
def __init__(self, port, debuglevel=0):
|
|
self._sendsequence = 0
|
|
self._recsequence = 0
|
|
self._debuglevel = debuglevel
|
|
self._cd = 1
|
|
self._try = 0
|
|
self._track = 1
|
|
self.errors = 0
|
|
super().__init__(port, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_EVEN,
|
|
stopbits=STOPBITS_ONE, timeout=0.2)
|
|
self.setRTS(True)
|
|
self.setDTR(False)
|
|
time.sleep(0.5)
|
|
self.reset_input_buffer()
|
|
|
|
def checksum(self, packet):
|
|
result = 0
|
|
for c in packet:
|
|
result ^= c
|
|
return bytes([result])
|
|
|
|
def _debug(self, level, message, packet=None):
|
|
if self._debuglevel >= level:
|
|
s = ''
|
|
if packet:
|
|
s = ', '.join([hex(c) for c in packet])
|
|
print(f"{message} {s}")
|
|
|
|
def sendPacket(self, packet, maxtries=5):
|
|
retries = 0
|
|
while retries < maxtries:
|
|
pkt = b'\x3d' + bytes([self._sendsequence, len(packet)]) + packet
|
|
pkt += self.checksum(pkt)
|
|
self._debug(NOISE, "Sending", pkt)
|
|
self.write(pkt)
|
|
reply = self.read(1)
|
|
if reply:
|
|
self._debug(NOISE, "Reply", reply)
|
|
else:
|
|
self._debug(ERROR, "No reply")
|
|
while reply == b'\x3d':
|
|
self._debug(ERROR, "New packet instead of ACK")
|
|
self.receivePacket(start_received=True)
|
|
reply = self.read(1)
|
|
if reply == b'\xc5':
|
|
self._sendsequence = (self._sendsequence + 1) % 256
|
|
self.errors = 0
|
|
if retries > 0:
|
|
self._debug(ERROR, f"Needed {retries} retries to send")
|
|
return True
|
|
retries += 1
|
|
self.errors += 1
|
|
self._debug(ERROR, "Not acknowledged")
|
|
return False
|
|
|
|
def receivePacket(self, start_received=False):
|
|
if not start_received:
|
|
startchar = self.read(1)
|
|
if startchar != b'\x3d':
|
|
if startchar:
|
|
self._debug(ERROR, "Bad startchar", startchar)
|
|
return
|
|
else:
|
|
startchar = b'\x3d'
|
|
seqno = self.read(1)
|
|
if not seqno:
|
|
self._debug(ERROR, "No sequence")
|
|
return
|
|
packetlen = self.read(1)
|
|
if not packetlen:
|
|
self._debug(ERROR, "No packetlen")
|
|
return
|
|
plen = packetlen[0]
|
|
packet = self.read(plen)
|
|
if len(packet) != plen:
|
|
self._debug(ERROR, f"Expected len {plen}, got {len(packet)}", packet)
|
|
return
|
|
chk = self.read(1)
|
|
if not chk:
|
|
self._debug(ERROR, "No checksum")
|
|
return
|
|
full_packet = startchar + seqno + packetlen + packet
|
|
if self.checksum(full_packet) != chk:
|
|
self._debug(ERROR, "Bad checksum")
|
|
return
|
|
self.write(b'\xc5')
|
|
self._debug(NOISE, "Received", packet)
|
|
if seqno != bytes([self._recsequence]):
|
|
self._recsequence = seqno[0]
|
|
decoded = self.decodePacket(packet)
|
|
handle_button_press(decoded)
|
|
print(decoded)
|
|
if decoded == 'Stalk pull':
|
|
print(f"Sending test command {self._try}")
|
|
if self.sendPacket(bytes([self._try])):
|
|
print("Acknowledged")
|
|
self._try += 1
|
|
else:
|
|
self._debug(ERROR, "Same sequence as previous packet")
|
|
|
|
def decodePacket(self, packet):
|
|
if packet in self._commands:
|
|
text, newcd, newtrack = self._commands[packet]
|
|
if newcd:
|
|
self._cd = newcd
|
|
self._track = 1
|
|
self.playingCD(self._cd)
|
|
else:
|
|
self._track += newtrack
|
|
if self._track < 1:
|
|
self._track = 99
|
|
elif self._track > 99:
|
|
self._track = 1
|
|
return text
|
|
return 'Unknown %s' % ', '.join([hex(c) for c in packet])
|
|
|
|
def playingCD(self, number):
|
|
return self.sendPacket(b'\x20\x01\x03\x09\x05' + bytes([number]))
|
|
|
|
def startPacket(self):
|
|
time.sleep(0.5)
|
|
self.reset_input_buffer()
|
|
self._sendsequence = 0
|
|
self._recsequence = 0
|
|
if self.sendPacket(b'\x11\x60\x06', 1):
|
|
self.sendPacket(b'\x25\x03') # random off
|
|
if self.playingCD(self._cd):
|
|
return True
|
|
return False
|
|
|
|
def fakePlaying(self):
|
|
return self.sendPacket(b'\x47' + bcd(self._track) + b'\x01\x00\x00\x01\x00\x00\x00\x00\x00')
|
|
|
|
def loop(self):
|
|
while True:
|
|
print("Trying to send start sequence")
|
|
if self.startPacket():
|
|
print("Start sequence sent, entering fake playing loop")
|
|
delay = time.time()
|
|
while self.errors < 3:
|
|
self.receivePacket()
|
|
now = time.time()
|
|
if now - delay >= 1:
|
|
self.fakePlaying()
|
|
delay = now
|
|
print("Too many errors, probably HU off")
|
|
else:
|
|
time.sleep(0.5)
|
|
|
|
if __name__ == '__main__':
|
|
# port = "/dev/ttyS0"
|
|
port = "/dev/serial0"
|
|
debuglevel = 99
|
|
|
|
p = cdc(port, debuglevel)
|
|
p.loop()
|