pyren/pyren3/mod_ecu_mnemonic.py

300 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python3
from mod_ecu_service import *
from mod_utils import Choice
from xml.dom.minidom import parse
from xml.dom.minidom import parseString
import xml.dom.minidom
import string
def get_mnemonicDTC( m, resp ):
'''This function uses synthetic response generated from real one '''
bytes = 1
bits = int(m.bitsLength)
if bits>7:
bytes = int(bits//8)
hexval = "00"
sb = int(m.startByte)-1
if (sb*3+bytes*3-1)>(len(resp)):
return hexval
hexval = resp[sb*3:(sb+bytes)*3-1]
hexval = hexval.replace(" ","")
if bits<8:
val = int(hexval,16)
val = (val<<int(m.startBit)>>(8-bits))&(2**bits-1)
hexval = str(val)
return hexval
def get_mnemonic( m, se, elm, raw = False ):
if not m.serviceID and mod_globals.ext_cur_DTC != "000000":
for sid in list(se.keys()):
startReq = se[sid].startReq
if startReq.startswith("12") and startReq.endswith(mod_globals.ext_cur_DTC[:4]):
m.startByte = se[sid].responces[list(se[sid].responces.keys())[0]].mnemolocations[m.name].startByte
m.startBit = se[sid].responces[list(se[sid].responces.keys())[0]].mnemolocations[m.name].startBit
m.request = se[sid].startReq
m.positive = se[sid].simpleRsp
m.delay = '100' #don't know how much it should be
#get responce
if len(m.sids)>0:
for sid in m.sids:
service = se[sid]
resp = executeService( service, elm, [], "", True )
else:
resp = elm.request( m.request, m.positive, True, m.delay )
#format responce
resp = resp.strip().replace(' ','')
if not all(c in string.hexdigits for c in resp): resp = ''
resp = ' '.join(a+b for a,b in zip(resp[::2], resp[1::2]))
if len(m.startByte)==0: m.startByte = '01'
hexval = getHexVal(m, m.startByte, m.startBit, resp, raw)
return hexval
def get_SnapShotMnemonic(m, se, elm, dataids):
snapshotService = ""
byteLength = 3 #number of chars in one byte string (two chars with one whitespace)
dataIdByteLength = 2
for sid in se:
if len(se[sid].params) > 1:
if se[sid].params[1]['type'] == 'Snapshot':
snapshotService = se[sid]
break
elif se[sid].startReq.startswith("1904"):
snapshotService = se[sid]
break
if not snapshotService:
return "00"
resp = executeService( snapshotService, elm, [], "", True )
if ((mod_globals.opt_demo and not resp) or not resp.startswith(snapshotService.simpleRsp) or len(resp)//2 == 6):
return "00"
resp = resp.strip().replace(' ','')
if not all(c in string.hexdigits for c in resp): resp = ''
resp = ' '.join(a+b for a,b in zip(resp[::2], resp[1::2]))
numberOfIdentifiers = int("0x" + resp[7*byteLength:8*byteLength-1],16)
resp = resp[8*byteLength:]
didDict = {} #dataIds with their corresponding data
numberOfPossibleDataId = 0 #number of unknown dataIds
posInResp = 0
dataIdLength = dataIdByteLength * byteLength
for idNum in range(numberOfIdentifiers):
#break if unknown dataids number plus current number dataid number is larger than overall number of dataids in the response
if (idNum + numberOfPossibleDataId > numberOfIdentifiers):
break
dataId = resp[posInResp:posInResp + dataIdLength].replace(" ", "")
posInResp += dataIdLength
if dataId not in list(dataids.keys()): #unknown dataId in the snapShot response
bytePos = 1
restOfResp = resp[posInResp:]
while True: #try to find a next dataId that is defined in the module file
posInRestResp = bytePos*byteLength
#using '2' as possible start of dataId is not the best approach, to fix in the future
if len(restOfResp) > posInRestResp + dataIdLength and restOfResp[posInRestResp] == '2':
numberOfPossibleDataId = numberOfPossibleDataId + 1
possibleDataId = restOfResp[posInRestResp:posInRestResp + dataIdLength].replace(" ", "")
if possibleDataId in list(dataids.keys()):
posInResp += posInRestResp
break
else:
bytePos += 1
else:
bytePos += 1
continue
didDataLength = int(dataids[dataId].dataBitLength)//8
didData = resp[posInResp: posInResp + didDataLength*byteLength]
posInResp += didDataLength*byteLength
didDict[dataId] = didData
startByte = "1"
startBit = "0"
dataId = ""
for did in list(dataids.keys()):
for mn in list(dataids[did].mnemolocations.keys()):
if mn == m.name:
dataId = did
startByte = dataids[dataId].mnemolocations[m.name].startByte
startBit = dataids[dataId].mnemolocations[m.name].startBit
if dataId not in didDict:
didDict[dataId] = '0' * (int(dataids[dataId].dataBitLength)//8)
hexval = getHexVal(m, startByte, startBit, didDict[dataId])
return hexval
def getHexVal(m, startByte, startBit, resp, raw = False):
#prepare local variables
sb = int(startByte) - 1
bits = int(m.bitsLength)
sbit = int(startBit)
bytes = int((bits+sbit-1)//8+1)
rshift = ((bytes+1)*8 - (bits+sbit))%8
#check length of responce
if (sb*3+bytes*3-1)>(len(resp)):
return '00'
#extract hex
hexval = resp[sb*3:(sb+bytes)*3-1]
hexval = hexval.replace(" ","")
if raw:
if resp.startswith(m.positive):
return hexval
else:
return 'ERROR'
#shift and mask
val = (int(hexval,16)>>rshift)&(2**bits-1)
#format result
hexval = hex(val)[2:]
#remove 'L'
if hexval[-1:].upper()=='L':
hexval = hexval[:-1]
#add left zero if need
if len(hexval)%2:
hexval = '0'+hexval
if (len(hexval)//2)%bytes:
hexval = '00' * (bytes - len(hexval)//2) + hexval
#revert byte order if little endian
if m.littleEndian == '1':
a = hexval
b = ''
if not len(a) % 2:
for i in range(0,len(a),2):
b = a[i:i+2]+b
hexval = b
return hexval
class ecu_mnemonic:
name = ""
littleEndian = ""
type = ""
bitsLength = ""
serviceID = ""
startByte = ""
startBit = ""
request = ""
delay = ""
nextDelay = ""
rOffset = ""
positive = ""
sids = []
def __str__(self):
out = '''
name = %s
littleEndian = %s
type = %s
bitsLength = %s
serviceID = %s
startByte = %s
startBit = %s
request = %s
delay = %s
nextDelay = %s
rOffset = %s
positive = %s
sids = %s
''' % (self.name, self.littleEndian, self.type, self.bitsLength, self.serviceID,
self.startByte, self.startBit, self.request, self.delay, self.nextDelay,
self.rOffset, self.positive, self.sids)
return pyren_encode(out)
def __init__(self, sv, opt ):
self.name = sv.getAttribute("name")
#print '*'*60
#print sv.toprettyxml()
MnemoDatas = sv.getElementsByTagName("MnemoDatas").item(0)
if MnemoDatas:
self.littleEndian = MnemoDatas.getAttribute("littleEndian")
self.type = MnemoDatas.getAttribute("type")
self.bitsLength = MnemoDatas.getAttribute("bitsLength")
#print "type=",self.type
self.sids = []
ServiceID = sv.getElementsByTagName("ServiceID").item(0)
ServiceIDs = sv.getElementsByTagName("ServiceID")
if ServiceIDs:
for ServiceID in ServiceIDs:
self.serviceID = ServiceID.getAttribute("name")
self.sids.append(self.serviceID)
srvxmlstr = opt["Service\\"+self.serviceID]
sdom = xml.dom.minidom.parseString( srvxmlstr.encode( "utf-8" ) )
sdoc = sdom.documentElement
self.delay = sdoc.getAttribute("delay")
if len(self.delay)==0: self.delay = "0"
Start = sdoc.getElementsByTagName("Start").item(0)
if Start:
Request = sdoc.getElementsByTagName("Request").item(0)
if Request:
self.request = Request.getAttribute("val")
self.nextDelay = Request.getAttribute("nextDelay")
MnemoLocation = sdoc.getElementsByTagName("MnemoLocation")
for m in MnemoLocation:
if m.getAttribute("name") == self.name:
self.startByte = m.getAttribute("startByte")
self.startBit = m.getAttribute("startBit")
self.rOffset = m.getAttribute("rOffset")
self.positive = ""
Simple = sdoc.getElementsByTagName("Simple").item(0)
if Simple:
self.positive = Simple.getAttribute("val")
#print "positive", self.positive
#return
continue
RepeatInProgress = sdoc.getElementsByTagName("RepeatInProgress").item(0)
if RepeatInProgress:
self.positive = RepeatInProgress.getAttribute("val")
#print "positive", self.positive
#return
continue
class ecu_mnemonics:
def __init__(self, mnemonic_list, mdoc, opt, tran ):
for k in list(opt.keys()):
if "Mnemonic" in k:
xmlstr = opt[k]
odom = xml.dom.minidom.parseString( xmlstr.encode( "utf-8" ) )
odoc = odom.documentElement
mnemonic = ecu_mnemonic( odoc, opt )
mnemonic_list[mnemonic.name] = mnemonic