forked from logzhan/RobotHardware-UESTC
204 lines
6.2 KiB
Python
204 lines
6.2 KiB
Python
import sys
|
|
sys.path.append("..")
|
|
import pypibot
|
|
from pypibot import log
|
|
from pypibot import assistant
|
|
|
|
import serial
|
|
import threading
|
|
import struct
|
|
import time
|
|
from dataholder import MessageID, BoardDataDict
|
|
FIX_HEAD = 0x5a
|
|
|
|
class Recstate():
|
|
WAITING_HD = 0
|
|
WAITING_MSG_ID = 1
|
|
RECEIVE_LEN = 2
|
|
RECEIVE_PACKAGE = 3
|
|
RECEIVE_CHECK = 4
|
|
|
|
def checksum(d):
|
|
sum = 0
|
|
if assistant.is_python3():
|
|
for i in d:
|
|
sum += i
|
|
sum = sum&0xff
|
|
else:
|
|
for i in d:
|
|
sum += ord(i)
|
|
sum = sum&0xff
|
|
return sum
|
|
|
|
|
|
class Transport:
|
|
def __init__(self, port, baudrate=921600):
|
|
self._Port = port
|
|
self._Baudrate = baudrate
|
|
self._KeepRunning = False
|
|
self.receive_state = Recstate.WAITING_HD
|
|
self.rev_msg = []
|
|
self.rev_data = []
|
|
self.req_lock = threading.Lock()
|
|
self.wait_event = threading.Event()
|
|
|
|
def getDataHolder(self):
|
|
return BoardDataDict
|
|
|
|
def start(self):
|
|
try:
|
|
self._Serial = serial.Serial(port=self._Port, baudrate=self._Baudrate, timeout=0.2)
|
|
self._KeepRunning = True
|
|
self._ReceiverThread = threading.Thread(target=self._Listen)
|
|
self._ReceiverThread.setDaemon(True)
|
|
self._ReceiverThread.start()
|
|
return True
|
|
except:
|
|
print("Open Serial Error")
|
|
return False
|
|
|
|
def Stop(self):
|
|
self._KeepRunning = False
|
|
time.sleep(0.1)
|
|
self._Serial.close()
|
|
|
|
def _Listen(self):
|
|
while self._KeepRunning:
|
|
if self.receiveFiniteStates(self._Serial.read()):
|
|
self.packageAnalysis()
|
|
|
|
def receiveFiniteStates(self, s):
|
|
if len(s) == 0:
|
|
return False
|
|
val = s[0]
|
|
val_int = val
|
|
if not assistant.is_python3():
|
|
val_int = ord(val)
|
|
|
|
if self.receive_state == Recstate.WAITING_HD:
|
|
if val_int == FIX_HEAD:
|
|
log.trace('got head')
|
|
self.rev_msg = []
|
|
self.rev_data =[]
|
|
self.rev_msg.append(val)
|
|
self.receive_state = Recstate.WAITING_MSG_ID
|
|
elif self.receive_state == Recstate.WAITING_MSG_ID:
|
|
log.trace('got msg id')
|
|
self.rev_msg.append(val)
|
|
self.receive_state = Recstate.RECEIVE_LEN
|
|
elif self.receive_state == Recstate.RECEIVE_LEN:
|
|
log.trace('got len:%d', val_int)
|
|
self.rev_msg.append(val)
|
|
if val_int == 0:
|
|
self.receive_state = Recstate.RECEIVE_CHECK
|
|
else:
|
|
self.receive_state = Recstate.RECEIVE_PACKAGE
|
|
elif self.receive_state == Recstate.RECEIVE_PACKAGE:
|
|
# if assistant.is_python3():
|
|
# self.rev_data.append((chr(val)).encode('latin1'))
|
|
# else:
|
|
self.rev_data.append(val)
|
|
r = False
|
|
if assistant.is_python3():
|
|
r = len(self.rev_data) == int(self.rev_msg[-1])
|
|
else:
|
|
r = len(self.rev_data) == ord(self.rev_msg[-1])
|
|
|
|
if r:
|
|
self.rev_msg.extend(self.rev_data)
|
|
self.receive_state = Recstate.RECEIVE_CHECK
|
|
elif self.receive_state == Recstate.RECEIVE_CHECK:
|
|
log.trace('got check')
|
|
self.receive_state = Recstate.WAITING_HD
|
|
if val_int == checksum(self.rev_msg):
|
|
log.trace('got a complete message')
|
|
return True
|
|
else:
|
|
self.receive_state = Recstate.WAITING_HD
|
|
|
|
# continue receiving
|
|
return False
|
|
|
|
def packageAnalysis(self):
|
|
if assistant.is_python3():
|
|
in_msg_id = int(self.rev_msg[1])
|
|
else:
|
|
in_msg_id = ord(self.rev_msg[1])
|
|
if assistant.is_python3():
|
|
log.debug("recv body:" + " ".join("{:02x}".format(c) for c in self.rev_data))
|
|
r = BoardDataDict[in_msg_id].unpack(bytes(self.rev_data))
|
|
else:
|
|
log.debug("recv body:" + " ".join("{:02x}".format(ord(c)) for c in self.rev_data))
|
|
r = BoardDataDict[in_msg_id].unpack(''.join(self.rev_data))
|
|
if r:
|
|
self.res_id = in_msg_id
|
|
if in_msg_id<100:
|
|
self.set_response()
|
|
else:#notify
|
|
log.debug('msg %d'%self.rev_msg[4],'data incoming')
|
|
pass
|
|
else:
|
|
log.debug ('error unpacking pkg')
|
|
|
|
def request(self, id, timeout=0.5):
|
|
self.req_lock.acquire()
|
|
if not self.write(id):
|
|
log.debug ('Serial send error!')
|
|
self.req_lock.release()
|
|
return False
|
|
if self.wait_for_response(timeout):
|
|
if id == self.res_id:
|
|
log.trace ('OK')
|
|
else:
|
|
log.error ('Got unmatched response!')
|
|
else:
|
|
log.error ('Request got no response!')
|
|
self.req_lock.release()
|
|
return False
|
|
# clear response
|
|
self.req_lock.release()
|
|
self.res_id = None
|
|
return True
|
|
|
|
def write(self, id):
|
|
cmd = self.make_command(id)
|
|
if assistant.is_python3():
|
|
log.d("write:" + " ".join("{:02x}".format(c) for c in cmd))
|
|
else:
|
|
log.d("write:" + " ".join("{:02x}".format(ord(c)) for c in cmd))
|
|
self._Serial.write(cmd)
|
|
return True
|
|
|
|
def wait_for_response(self, timeout):
|
|
self.wait_event.clear()
|
|
return self.wait_event.wait(timeout)
|
|
|
|
def set_response(self):
|
|
self.wait_event.set()
|
|
|
|
def make_command(self, id):
|
|
#print(DataDict[id])
|
|
data = BoardDataDict[id].pack()
|
|
l = [FIX_HEAD, id, len(data)]
|
|
head = struct.pack("3B", *l)
|
|
body = head + data
|
|
|
|
if assistant.is_python3():
|
|
return body + chr(checksum(body)).encode('latin1')
|
|
else:
|
|
return body + chr(checksum(body))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
mboard = Transport('com10')
|
|
if not mboard.start():
|
|
import sys
|
|
sys.exit()
|
|
|
|
p = mboard.request(MessageID.ID_GET_VERSION)
|
|
log.i("result=%s"%p)
|
|
print('Version =[',mboard.getDataHolder()[MessageID.ID_GET_VERSION].version.decode(), mboard.getDataHolder()[MessageID.ID_GET_VERSION].build_time.decode(),"]\r\n")
|
|
|
|
|