GeekIMU/4.Software/VisualizeTools 1.0/core/serial_parser.py

72 lines
2.5 KiB
Python
Raw Normal View History

# core/serial_parser.py
import serial
import threading
import time
from collections import deque
class SerialParser:
def __init__(self, port='COM3', baud=115200):
self.ser = serial.Serial(port, baud, timeout=0.1)
self.data_queue = deque(maxlen=1000)
self.running = True
self.thread = threading.Thread(target=self._read_loop)
self.thread.start()
def _parse_line(self, line):
try:
# 原始数据清洗
decoded = line.decode().strip().lower() # 统一小写处理
if not decoded:
return None
# 构建数据字典
data_dict = {}
for pair in decoded.split():
if ':' not in pair:
continue # 跳过无效字段
key, value = pair.split(':', 1) # 仅分割第一个冒号
data_dict[key] = value
# 数据转换与结构封装
return {
'attitude': {
'roll': float(data_dict.get('roll', 0)),
'pitch': float(data_dict.get('pitch', 0)),
'yaw': float(data_dict.get('yaw', 0))
},
'acc': {
'x': float(data_dict.get('accx', 0)),
'y': float(data_dict.get('accy', 0)),
'z': float(data_dict.get('accz', 0))
},
'gyro': {
'x': float(data_dict.get('gyrox', 0)),
'y': float(data_dict.get('gyroy', 0)),
'z': float(data_dict.get('gyroz', 0))
},
'timestamp': time.time()
}
except Exception as e:
print(f"解析异常: {str(e)}")
return None
def _read_loop(self):
buffer = bytearray()
while self.running:
buffer += self.ser.read_all()
while b'\r\n' in buffer:
line_end = buffer.index(b'\r\n')
line = buffer[:line_end]
buffer = buffer[line_end+2:]
# 改造为兼容写法
parsed = self._parse_line(line)
if parsed is not None:
self.data_queue.append(parsed)
def get_latest_data(self):
return self.data_queue[-1] if self.data_queue else None
def stop(self):
self.running = False
self.thread.join()
self.ser.close()