Compare commits

..

No commits in common. "7bb2ee4d5fb58b9a3bad87fdf397d388276c8501" and "afbfe9401f3f8580fed331a7f915b59c8f019fba" have entirely different histories.

1140 changed files with 26659 additions and 9209 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

View File

@ -0,0 +1,93 @@
# `PIBot_ROS`使用说明
> `pibot`是银星合作时购买的小车底盘。其包含直流电机控制、里程计、IMU和上位机通讯等功能。
## 1、远程连接`RK3588`板子
我们是通过`RK3588 & RK3566`等高性能的主板控制`PIBot`下位机(`STM32`控制板)。如果直接采用`HDMI`线和`USB`鼠标和键盘进行控制非常的不便; 采用`VNC`等远程桌面的方式对性能的损耗较大所以采用远程ssh连接一种较好的方式。这里**我们采用的`vscode`进行ssh远程连接**, 其具有远程图形显示功能以及其他强大的功能。
`ssh`连接需要知道目标机器的`ip`地址。理论上如果知道开发板的外网`IP`地址,那么就可以在任意位置连接开发板。由于外网固定`IP`难以获取,所以这里我们**采用局域网ssh连接**。由于`windows`平台的渲染窗口和`linux`不兼容,所以**windows平台的vscode是不支持ssh窗口显示的需要开启虚拟机或者`linux`主机下面的`vscode`才可以显示窗口**。我们在虚拟机`ubuntu`里面使用`vscode`远程`ssh`连接`RK3588`和虚拟机需要用同个网络,远程连接[教程](http://www.autolabor.com.cn/book/ROSTutorials/di-9-zhang-ji-qi-ren-dao-822a28-shi-4f5329/92-vscodeyuan-cheng-kai-fa.html)
1. `ip`查询方式终端: 在小车终端里输入 `ifconfig`查看小车的ip地址
2. 将教程里步骤三内容改为:
3. `ssh -X firefly@ip` -X 为远程显示图形界面需要)
4. 密码为:`firefly`
5. 连接上后选择需要打开的文件夹
## 2、小车驱动编译
#### 2.1 驱动代码位置
`PIBot`的ROS支持包的位置在[仓库]([RobotKernal-UESTC](http://logzhan.ticp.io:30000/logzhan/RobotKernal-UESTC))中,其路径为`Code\RK3588\PIbot_ROS`中。我们解压后可以把`PIBot_ROS`移动到`~/`位置,并重新命名为`pibot_ros`(命名可以按照个人习惯),下面的教程以`pibot_ros`为例。
#### 2.2 3588上编译问题主要是全覆盖部分
`PIBot_ROS/ros_ws/src/ipa_coverage_planning`(全覆盖部分的代码)
```shell
# 进入到ROS的工作空间
cd pibot_ros/ros_ws
# 在pibot_ros/ros_ws路径下执行catkin_make
# ~:/pibot_ros/ros_ws$ catkin_make
catkin_make
```
#### 2.3 根据出现的报错缺少的库问题解决
```shell
# noetic 是ubuntu20.04版本名,其他版本需要更换名字
sudo apt-get install ros-noetic-xxx
# 例如sudo apt-get install ros-noetic-opengm
sudo apt-get install ros-noetic-libdlib
sudo apt-get install ros-noetic-cob-navigation
sudo apt-get install coinor-*
```
可参考内容:
> [全覆盖规划算法Coverage Path Planning开源代码ipa_coverage_planning编译-CSDN博客](https://blog.csdn.net/ktigerhero3/article/details/121562049)
>
> [ROS全覆盖规划算法 Coverage Path Planning 采坑-CSDN博客](https://blog.csdn.net/weixin_42179076/article/details/121164350)
## 3、使用说明
#### 3.1 导航的启动
`PIBot_ROS`的文件解压或重新命名可以按照个人习惯即可,这里以解压后命名为`pibot_ros`为例,介绍命令启动导航。
```shell
# 进入到pibot_ros的工作空间
cd ~/pibot_ros/ros_ws
# 配置环境变量
source ./devel/setup.bash
# 启动导航文件
# 格式 roslaunch package_name launch_file_name
roslaunch pibot_navigation nav.launch
```
#### 3.2 更改导航地图
根据导航需求更改导航地图,地图路径在下图左下角位置`pgm`为地图格式 `yaml`为配置文件:
<img src="./Image/ROS_Using (3).png" alt="Untitled" style="zoom:80%;" />
#### 3.3 小车控制和可视化
小车的控制和可视化功能需要**开启两个新的终端,分别在终端输入命令**
```shell
# 小车的键盘控制,在任意路径执行:
pibot_control
```
```shell
# 导航可视化,在任意路径执行:
pibot_view (需要设置从主机 记不清 后面再补充)
```
在这上面可以在地图上给出目标点进行单点导航。
#### 3.4、全覆盖路径规划的使用

View File

@ -1,6 +1,6 @@
# PIBOT ROS Workspace v2.0
## install ros
## 安装ROS
```shell
cd ~/pibot_ros/
./pibot_install_ros.sh

View File

@ -1,17 +1,17 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import pypibot.assistant
import pypibot.log as Logger
import pypibot.err
log=Logger.log
import sys
def createNamedLog(name):
return Logger.NamedLog(name)
class Object():
pass
isDebug="-d" in sys.argv
import platform
isWindows=False
if platform.system()=='Windows':
isWindows=True
#!/usr/bin/python
# -*- coding: utf-8 -*-
import pypibot.assistant
import pypibot.log as Logger
import pypibot.err
log=Logger.log
import sys
def createNamedLog(name):
return Logger.NamedLog(name)
class Object():
pass
isDebug="-d" in sys.argv
import platform
isWindows=False
if platform.system()=='Windows':
isWindows=True

View File

@ -1,234 +1,234 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os, sys, inspect
import datetime
import signal
import threading
import time
# function: get directory of current script, if script is built
# into an executable file, get directory of the excutable file
def current_file_directory():
path = os.path.realpath(sys.path[0]) # interpreter starter's path
if os.path.isfile(path): # starter is excutable file
path = os.path.dirname(path)
path = os.path.abspath(path) # return excutable file's directory
else: # starter is python script
caller_file = inspect.stack()[0][1] # function caller's filename
path = os.path.abspath(os.path.dirname(caller_file))# return function caller's file's directory
if path[-1]!=os.path.sep:path+=os.path.sep
return path
"""格式化字符串"""
def formatString(string,*argv):
string=string%argv
if string.find('$(scriptpath)')>=0:
string=string.replace('$(scriptpath)',current_file_directory())
if string.find('$(filenumber2)')>=0:
i=0
path=""
while True:
path=string.replace('$(scriptpath)',"%02d"%i)
if not os.path.lexists(path):break
i+=1
string=path
#8位日期20140404
if string.find('$(Date8)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Date8)', now.strftime("%Y%m%d"))
#6位日期140404
if string.find('$(Date6)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Date6)', now.strftime("%y%m%d"))
#6位时间121212
if string.find('$(Time6)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Time6)', now.strftime("%H%M%S"))
#4位时间1212
if string.find('$(Time4)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Time4)', now.strftime("%H%M"))
#文件编号2位必须在最后
if string.find('$(filenumber2)')>=0:
i=0
path=""
while True:
path=string.replace('$(filenumber2)',"%02d"%i)
if not os.path.lexists(path):break
i+=1
string=path
#文件编号3位必须在最后
if string.find('$(filenumber3)')>=0:
i=0
path=""
while True:
path=string.replace('$(filenumber3)',"%03d"%i)
if not os.path.lexists(path):break
i+=1
string=path
return string
"""
取得进程列表
格式(PID,cmd)
"""
def getProcessList():
processList = []
try:
for line in os.popen("ps xa"):
fields = line.split()
# spid = fields[0]
pid = 0
try:pid = int(fields[0])
except:None
cmd = line[27:-1]
# print "PS:%d,%s"%(npid,process)
if pid != 0 and len(cmd) > 0:
processList.append((pid, cmd))
except Exception as e:
print("getProcessList except:%s" % (e))
return processList
def killCommand(cmd):
try:
processList = getProcessList()
for p in processList:
if p[1].find(cmd) != -1:
pid = p[0]
os.kill(pid, signal.SIGKILL)
except Exception as e:
print("killCommand %s except:%s" % (cmd,e))
def check_pid(pid):
""" Check For the existence of a unix pid. """
if pid == 0:return False
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
SF=formatString
#全局异常捕获
def excepthook(excType, excValue, tb):
'''''write the unhandle exception to log'''
from log import log
import traceback
log.e('Unhandled Error: %s',''.join(traceback.format_exception(excType, excValue, tb)))
sys.exit(-1)
#sys.__excepthook__(type, value, trace)
#sys.__excepthook__(excType, excValue, tb)
_defaultGlobalExcept=sys.excepthook
def enableGlobalExcept(enable=True):
if enable:
sys.excepthook = excepthook
else:
sys.excepthook=_defaultGlobalExcept
# 默认启动全局异常处理
enableGlobalExcept()
#创建线程
def createThread(name,target,args=(),autoRun=True,daemon=True):
from log import log
def threadProc():
log.i("thread %s started!",name)
try:
target(*args)
log.i("thread %s ended!",name)
except Exception as e:
log.e("thread %s crash!err=%s",name,e)
thd=threading.Thread(name=name,target=threadProc)
thd.setDaemon(daemon)
if autoRun:thd.start()
return thd
#定时器
class Timer():
def __init__(self, timer_proc, args=(),first=0,period=0,name="Timer"):
self.name=name
self.first=first
self.period=period
self.args=args
self.timer_proc=timer_proc
self.thdWork=None
self.stopFlag=False
from log import NamedLog
self.log=NamedLog(name)
def run(self):
if self.first>0:
time.sleep(self.first)
try:
self.timer_proc(*self.args)
except Exception as e:
self.log.error("timer proc crash!err=%s",e)
while self.period>0 and not self.stopFlag:
time.sleep(self.period)
try:
self.timer_proc(*self.args)
except Exception as e:
self.log.error("timer proc crash!err=%s",e)
def start(self):
if self.isAlive():
self.log.d("already running!")
return True
self.stopFlag=False
self.thdWork=threading.Thread(name=self.name,target=self.run)
self.thdWork.setDaemon(True)
self.thdWork.start()
def stop(self,timeout=3):
if self.isAlive():
self.stopFlag=True
try:
self.thdWork.join(timeout)
except Exception as e:
self.log.e("stop timeout!")
def isAlive(self):
return self.thdWork and self.thdWork.isAlive()
#计时器
class Ticker():
def __init__(self):
self.reset()
# 片段,可以判断时长是否在一个特定毫秒段内
self.section=[]
def reset(self):
self.tick=time.time()
self.end=0
def stop(self):
self.end=time.time()
@property
def ticker(self):
if self.end==0:
return int((time.time()-self.tick)*1000)
else:
return int((self.end-self.tick)*1000)
def addSection(self,a,b):
a,b=int(a),int(b)
assert a<b
self.section.append((a,b))
def removeSection(self,a,b):
a,b=int(a),int(b)
self.section.remove((a,b))
def removeAllSectioin(self):
self.section=[]
def inSection(self):
tick=self.ticker
for (a,b) in self.section:
if tick>=a and tick<=b:
return True
return False
def __call__(self):
return self.ticker
def waitExit():
import log
log.log.i("start waiting to exit...")
try:
while(True):
time.sleep(1)
except Exception as e:
log.log.w("recv exit sign!")
def is_python3():
return sys.hexversion > 0x03000000
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os, sys, inspect
import datetime
import signal
import threading
import time
# function: get directory of current script, if script is built
# into an executable file, get directory of the excutable file
def current_file_directory():
path = os.path.realpath(sys.path[0]) # interpreter starter's path
if os.path.isfile(path): # starter is excutable file
path = os.path.dirname(path)
path = os.path.abspath(path) # return excutable file's directory
else: # starter is python script
caller_file = inspect.stack()[0][1] # function caller's filename
path = os.path.abspath(os.path.dirname(caller_file))# return function caller's file's directory
if path[-1]!=os.path.sep:path+=os.path.sep
return path
"""格式化字符串"""
def formatString(string,*argv):
string=string%argv
if string.find('$(scriptpath)')>=0:
string=string.replace('$(scriptpath)',current_file_directory())
if string.find('$(filenumber2)')>=0:
i=0
path=""
while True:
path=string.replace('$(scriptpath)',"%02d"%i)
if not os.path.lexists(path):break
i+=1
string=path
#8位日期20140404
if string.find('$(Date8)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Date8)', now.strftime("%Y%m%d"))
#6位日期140404
if string.find('$(Date6)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Date6)', now.strftime("%y%m%d"))
#6位时间121212
if string.find('$(Time6)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Time6)', now.strftime("%H%M%S"))
#4位时间1212
if string.find('$(Time4)')>=0:
now=datetime.datetime.now()
string=string.replace('$(Time4)', now.strftime("%H%M"))
#文件编号2位必须在最后
if string.find('$(filenumber2)')>=0:
i=0
path=""
while True:
path=string.replace('$(filenumber2)',"%02d"%i)
if not os.path.lexists(path):break
i+=1
string=path
#文件编号3位必须在最后
if string.find('$(filenumber3)')>=0:
i=0
path=""
while True:
path=string.replace('$(filenumber3)',"%03d"%i)
if not os.path.lexists(path):break
i+=1
string=path
return string
"""
取得进程列表
格式(PID,cmd)
"""
def getProcessList():
processList = []
try:
for line in os.popen("ps xa"):
fields = line.split()
# spid = fields[0]
pid = 0
try:pid = int(fields[0])
except:None
cmd = line[27:-1]
# print "PS:%d,%s"%(npid,process)
if pid != 0 and len(cmd) > 0:
processList.append((pid, cmd))
except Exception as e:
print("getProcessList except:%s" % (e))
return processList
def killCommand(cmd):
try:
processList = getProcessList()
for p in processList:
if p[1].find(cmd) != -1:
pid = p[0]
os.kill(pid, signal.SIGKILL)
except Exception as e:
print("killCommand %s except:%s" % (cmd,e))
def check_pid(pid):
""" Check For the existence of a unix pid. """
if pid == 0:return False
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
SF=formatString
#全局异常捕获
def excepthook(excType, excValue, tb):
'''''write the unhandle exception to log'''
from log import log
import traceback
log.e('Unhandled Error: %s',''.join(traceback.format_exception(excType, excValue, tb)))
sys.exit(-1)
#sys.__excepthook__(type, value, trace)
#sys.__excepthook__(excType, excValue, tb)
_defaultGlobalExcept=sys.excepthook
def enableGlobalExcept(enable=True):
if enable:
sys.excepthook = excepthook
else:
sys.excepthook=_defaultGlobalExcept
# 默认启动全局异常处理
enableGlobalExcept()
#创建线程
def createThread(name,target,args=(),autoRun=True,daemon=True):
from log import log
def threadProc():
log.i("thread %s started!",name)
try:
target(*args)
log.i("thread %s ended!",name)
except Exception as e:
log.e("thread %s crash!err=%s",name,e)
thd=threading.Thread(name=name,target=threadProc)
thd.setDaemon(daemon)
if autoRun:thd.start()
return thd
#定时器
class Timer():
def __init__(self, timer_proc, args=(),first=0,period=0,name="Timer"):
self.name=name
self.first=first
self.period=period
self.args=args
self.timer_proc=timer_proc
self.thdWork=None
self.stopFlag=False
from log import NamedLog
self.log=NamedLog(name)
def run(self):
if self.first>0:
time.sleep(self.first)
try:
self.timer_proc(*self.args)
except Exception as e:
self.log.error("timer proc crash!err=%s",e)
while self.period>0 and not self.stopFlag:
time.sleep(self.period)
try:
self.timer_proc(*self.args)
except Exception as e:
self.log.error("timer proc crash!err=%s",e)
def start(self):
if self.isAlive():
self.log.d("already running!")
return True
self.stopFlag=False
self.thdWork=threading.Thread(name=self.name,target=self.run)
self.thdWork.setDaemon(True)
self.thdWork.start()
def stop(self,timeout=3):
if self.isAlive():
self.stopFlag=True
try:
self.thdWork.join(timeout)
except Exception as e:
self.log.e("stop timeout!")
def isAlive(self):
return self.thdWork and self.thdWork.isAlive()
#计时器
class Ticker():
def __init__(self):
self.reset()
# 片段,可以判断时长是否在一个特定毫秒段内
self.section=[]
def reset(self):
self.tick=time.time()
self.end=0
def stop(self):
self.end=time.time()
@property
def ticker(self):
if self.end==0:
return int((time.time()-self.tick)*1000)
else:
return int((self.end-self.tick)*1000)
def addSection(self,a,b):
a,b=int(a),int(b)
assert a<b
self.section.append((a,b))
def removeSection(self,a,b):
a,b=int(a),int(b)
self.section.remove((a,b))
def removeAllSectioin(self):
self.section=[]
def inSection(self):
tick=self.ticker
for (a,b) in self.section:
if tick>=a and tick<=b:
return True
return False
def __call__(self):
return self.ticker
def waitExit():
import log
log.log.i("start waiting to exit...")
try:
while(True):
time.sleep(1)
except Exception as e:
log.log.w("recv exit sign!")
def is_python3():
return sys.hexversion > 0x03000000

View File

@ -1,56 +1,56 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import ConfigParser
from log import PLOG
import os
def getdefaultfilename():
pass
def openconfiger(filename):
return configer(filename)
class configer:
def __init__(self,fullfilepath):
self._filepath=fullfilepath
if not os.path.isdir(os.path.dirname(fullfilepath)):
os.makedirs(os.path.dirname(fullfilepath))
self._conf=ConfigParser.ConfigParser()
if os.path.isfile(fullfilepath):
try:
self._conf.readfp(open(fullfilepath,"r"))
except Exception,e:
PLOG.error("配置文件'%s'打开失败,err=%s"%(self._filepath,e))
def save(self):
try:
self._conf.write(open(self._filepath,"w"))
except Exception,e:
PLOG.error("配置文件'%s'保存失败,err=%s"%(self._filepath,e))
def changeConfValue(self,section,option,value):
if self._conf.has_section(section):
self._conf.set(section,option,value)
else:
self._conf.add_section(section)
self._conf.set(section,option,value)
def _readvalue(self,fn,section,option,default):
result=default
if self._conf.has_section(section):
if self._conf.has_option(section,option):
result=fn(section,option)
PLOG.debug("Option[%s][%s]=%s"%(section,option,str(result)))
else:
self._conf.set(section,option,str(default))
result=default
else:
self._conf.add_section(section)
self._conf.set(section,option,str(default))
result=default
return result
def getstr(self,section,option,default=""):
return self._readvalue(self._conf.get, section, option, default)
def getint(self,section,option,default=0):
return self._readvalue(self._conf.getint, section, option, default)
def getfloat(self,section,option,default=0.0):
return self._readvalue(self._conf.getfloat, section, option, default)
def getboolean(self,section,option,default=False):
return self._readvalue(self._conf.getboolean, section, option, default)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import ConfigParser
from log import PLOG
import os
def getdefaultfilename():
pass
def openconfiger(filename):
return configer(filename)
class configer:
def __init__(self,fullfilepath):
self._filepath=fullfilepath
if not os.path.isdir(os.path.dirname(fullfilepath)):
os.makedirs(os.path.dirname(fullfilepath))
self._conf=ConfigParser.ConfigParser()
if os.path.isfile(fullfilepath):
try:
self._conf.readfp(open(fullfilepath,"r"))
except Exception,e:
PLOG.error("配置文件'%s'打开失败,err=%s"%(self._filepath,e))
def save(self):
try:
self._conf.write(open(self._filepath,"w"))
except Exception,e:
PLOG.error("配置文件'%s'保存失败,err=%s"%(self._filepath,e))
def changeConfValue(self,section,option,value):
if self._conf.has_section(section):
self._conf.set(section,option,value)
else:
self._conf.add_section(section)
self._conf.set(section,option,value)
def _readvalue(self,fn,section,option,default):
result=default
if self._conf.has_section(section):
if self._conf.has_option(section,option):
result=fn(section,option)
PLOG.debug("Option[%s][%s]=%s"%(section,option,str(result)))
else:
self._conf.set(section,option,str(default))
result=default
else:
self._conf.add_section(section)
self._conf.set(section,option,str(default))
result=default
return result
def getstr(self,section,option,default=""):
return self._readvalue(self._conf.get, section, option, default)
def getint(self,section,option,default=0):
return self._readvalue(self._conf.getint, section, option, default)
def getfloat(self,section,option,default=0.0):
return self._readvalue(self._conf.getfloat, section, option, default)
def getboolean(self,section,option,default=False):
return self._readvalue(self._conf.getboolean, section, option, default)

View File

@ -1,140 +1,140 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, time, atexit
from signal import SIGTERM
def check_pid(pid):
""" Check For the existence of a unix pid. """
if pid == 0:return False
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
class daemon:
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid and check_pid(pid):
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
print("daemon start...")
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err))
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, time, atexit
from signal import SIGTERM
def check_pid(pid):
""" Check For the existence of a unix pid. """
if pid == 0:return False
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
class daemon:
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid and check_pid(pid):
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
print("daemon start...")
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err))
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""

View File

@ -1,58 +1,58 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# 异常类
class PibotError(Exception):
def __init__(self, errcode, errmsg):
self.errcode = errcode
self.errmsg = errmsg
#Exception.__init__(self,self.__str__())
def msg(self, msg):
if not msg is None:return PibotError(self.errcode, msg)
return PibotError(8,"unknow error")
def __str__(self):
return "PibotError:%s(%d)"%(self.errmsg,self.errcode)
@property
def message(self):
return str(self)
# 声明
# 成功
success=PibotError(0,"null")
# 通用失败
fail=PibotError(1,"fail")
# 参数无效
invalidParameter=PibotError(2,"invalid parameter")
# 不支持
noSupport=PibotError(3,"no support")
# 不存在
noExist=PibotError(4,"no exist")
# 超时
timeout=PibotError(5,"timeout")
# 繁忙
busy=PibotError(6,"busy")
# 缺少参数
missParameter=PibotError(7,"miss parameter")
# 系统错误(通用错误)
systemError=PibotError(8,"system error")
# 密码错误
invalidPassword=PibotError(9,"invalid password")
# 编码失败
encodeFailed=PibotError(10,"encode failed")
# 数据库操作失败
dbOpertationFailed=PibotError(11,"db error")
# 已占用
occupied=PibotError(12,"occupied")
# session不存在
noSession = PibotError(13,'cannot find session')
#没有找到
noFound = PibotError(14, "no found")
#已经存在
existed = PibotError(15, "existed")
#已经锁定
locked = PibotError(16, "locked")
#已经过期
expired = PibotError(17, "is expired")
#无效的参数
invalidParameter = PibotError(18, "invalid parameter")
#!/usr/bin/python
# -*- coding: utf-8 -*-
# 异常类
class PibotError(Exception):
def __init__(self, errcode, errmsg):
self.errcode = errcode
self.errmsg = errmsg
#Exception.__init__(self,self.__str__())
def msg(self, msg):
if not msg is None:return PibotError(self.errcode, msg)
return PibotError(8,"unknow error")
def __str__(self):
return "PibotError:%s(%d)"%(self.errmsg,self.errcode)
@property
def message(self):
return str(self)
# 声明
# 成功
success=PibotError(0,"null")
# 通用失败
fail=PibotError(1,"fail")
# 参数无效
invalidParameter=PibotError(2,"invalid parameter")
# 不支持
noSupport=PibotError(3,"no support")
# 不存在
noExist=PibotError(4,"no exist")
# 超时
timeout=PibotError(5,"timeout")
# 繁忙
busy=PibotError(6,"busy")
# 缺少参数
missParameter=PibotError(7,"miss parameter")
# 系统错误(通用错误)
systemError=PibotError(8,"system error")
# 密码错误
invalidPassword=PibotError(9,"invalid password")
# 编码失败
encodeFailed=PibotError(10,"encode failed")
# 数据库操作失败
dbOpertationFailed=PibotError(11,"db error")
# 已占用
occupied=PibotError(12,"occupied")
# session不存在
noSession = PibotError(13,'cannot find session')
#没有找到
noFound = PibotError(14, "no found")
#已经存在
existed = PibotError(15, "existed")
#已经锁定
locked = PibotError(16, "locked")
#已经过期
expired = PibotError(17, "is expired")
#无效的参数
invalidParameter = PibotError(18, "invalid parameter")

View File

@ -1,259 +1,259 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys,os
import datetime
import threading
import pypibot.assistant as assistant
import platform
if assistant.is_python3():
import _thread
else:
import thread
import traceback
"""
%a Locales abbreviated weekday name.
%A Locales full weekday name.
%b Locales abbreviated month name.
%B Locales full month name.
%c Locales appropriate date and time representation.
%d Day of the month as a decimal number [01,31].
%H Hour (24-hour clock) as a decimal number [00,23].
%I Hour (12-hour clock) as a decimal number [01,12].
%j Day of the year as a decimal number [001,366].
%m Month as a decimal number [01,12].
%M Minute as a decimal number [00,59].
%p Locales equivalent of either AM or PM. (1)
%S Second as a decimal number [00,61]. (2)
%U Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Sunday are considered to be in week 0. (3)
%w Weekday as a decimal number [0(Sunday),6].
%W Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Monday are considered to be in week 0. (3)
%x Locales appropriate date representation.
%X Locales appropriate time representation.
%y Year without century as a decimal number [00,99].
%Y Year with century as a decimal number.
%Z Time zone name (no characters if no time zone exists).
%% A literal '%' character.
"""
isWindows=False
if platform.system()=='Windows':
isWindows=True
defaultEncodeing="utf8"
if "-utf8" in sys.argv:
defaultEncodeing="utf-8"
if "-gbk" in sys.argv:
defaultEncodeing="gbk"
TRACE=5
DEBUG=4
INFORMATION=3
WARNING=2
ERROR=1
NONE=0
MAX_MSG_SIZE = 4096
def getLevelFromString(level):
level=level.lower()
if level=='t' or level=='trace':return 5
elif level=='d' or level=='debug':return 4
elif level=='i' or level=='info':return 3
elif level=='w' or level=='wran':return 2
elif level=='e' or level=='error':return 1
else :return 0
def getLevelString(level):
if level==TRACE:return "T"
elif level==DEBUG:return "D"
elif level==INFORMATION:return "I"
elif level==WARNING:return "W"
elif level==ERROR:return "E"
else:return "N"
class PibotLog:
def __init__(self):
self.isEnableControlLog=True
self.fileTemple=None
self.filePath=""
self.level=5
self._lock=threading.RLock()
self.fd=None
self.fd_day=-1
def setLevel(self,level):
self.level=getLevelFromString(level)
def enableControllog(self,enable):
self.isEnableControlLog=enable
def enableFileLog(self,fileName):
self.fileTemple=fileName
self.updateFilelog()
def updateFilelog(self):
fn=assistant.SF(self.fileTemple)
if fn!=self.filePath:
self.i("new log file:%s",fn)
if self.fd:
self.fd.close()
self.fd=None
self.fd_day=-1
self.filePath=""
try:
path=os.path.dirname(fn)
if path!="" and not os.path.isdir(path):os.makedirs(path)
self.fd=open(fn,mode="w")
try:
linkfn = fn.split(".log.", 1)[0] + ".INFO"
if os.path.exists(linkfn):
os.remove(linkfn)
(filepath, tempfilename) = os.path.split(fn)
os.symlink(tempfilename, linkfn)
except:
pass
self.fd_day=datetime.datetime.now().day
self.filePath=fn
except Exception as e:
print("open file fail!file=%s,err=%s"%(fn,e))
def _output(self,level,msg,args):
if self.level<level:return
try:
if len(args)>0:
t=[]
for arg in args:
if isinstance(arg,Exception):
t.append(traceback.format_exc(arg).decode('utf-8'))
elif isinstance(arg,bytes) :
t.append(arg.decode('utf-8'))
else:
t.append(arg)
args=tuple(t)
try:
msg=msg%args
except:
try:
for arg in args:
msg=msg+str(arg)+" "
except Exception as e:
msg=msg+"==LOG ERROR==>%s"%(e)
if len(msg)>MAX_MSG_SIZE:msg=msg[0:MAX_MSG_SIZE]
now=datetime.datetime.now()
msg=msg+"\r\n"
if assistant.is_python3():
id = _thread.get_ident()
else:
id = thread.get_ident()
s="[%s] %04d-%02d-%02d %02d:%02d:%02d.%03d (0x%04X):%s"%(getLevelString(level),now.year,now.month,now.day,now.hour,now.minute,now.second,now.microsecond/1000,(id>>8)&0xffff,msg)
prefix=''
suffix=''
if not isWindows:
suffix='\033[0m'
if level==TRACE:
prefix='\033[0;37m'
elif level==DEBUG:
prefix='\033[1m'
elif level==INFORMATION:
prefix='\033[0;32m'
elif level==WARNING:
prefix='\033[0;33m'
elif level==ERROR:
prefix='\033[0;31m'
else:
pass
self._lock.acquire()
try:
if self.isEnableControlLog:
sys.stdout.write((prefix+s+suffix))
if self.fd:
if self.fd_day!=now.day:
self.updateFilelog()
if assistant.is_python3():
self.fd.write(s)
else:
self.fd.write(s.encode('utf-8'))
self.fd.flush()
finally:
self._lock.release()
except Exception as e:
if assistant.is_python3():
print(e)
else:
print("PibotLog._output crash!err=%s"%traceback.format_exc(e))
def trace(self,msg,*args):
self._output(TRACE,msg,args)
def t(self,msg,*args):
self._output(TRACE,msg,args)
def debug(self,msg,*args):
self._output(DEBUG, msg,args)
def d(self,msg,*args):
self._output(DEBUG, msg,args)
def info(self,msg,*args):
self._output(INFORMATION, msg,args)
def i(self,msg,*args):
self._output(INFORMATION, msg,args)
def warn(self,msg,*args):
self._output(WARNING, msg,args)
def w(self,msg,*args):
self._output(WARNING, msg,args)
def error(self,msg,*args):
self._output(ERROR, msg,args)
def e(self,msg,*args):
self._output(ERROR, msg,args)
def _log(self,level,msg,args):
self._output(level, msg,args)
def createNamedLog(self,name):
return NamedLog(name)
log=PibotLog()
class NamedLog:
def __init__(self,name=None):
global log
self.name=''
if name:
self.name='['+name+']'
self.log=log
def trace(self,msg,*args):
msg=self.name+msg
self.log._log(TRACE,msg,args)
def t(self,msg,*args):
msg=self.name+msg
self.log._log(TRACE,msg,args)
def debug(self,msg,*args):
msg=self.name+msg
self.log._log(DEBUG, msg,args)
def d(self,msg,*args):
msg=self.name+msg
self.log._log(DEBUG, msg,args)
def info(self,msg,*args):
msg=self.name+msg
self.log._log(INFORMATION, msg,args)
def i(self,msg,*args):
msg=self.name+msg
self.log._log(INFORMATION, msg,args)
def warn(self,msg,*args):
msg=self.name+msg
self.log._log(WARNING, msg,args)
def w(self,msg,*args):
msg=self.name+msg
self.log._log(WARNING, msg,args)
def error(self,msg,*args):
msg=self.name+msg
self.log._log(ERROR, msg,args)
def e(self,msg,*args):
msg=self.name+msg
self.log._log(ERROR, msg,args)
if __name__ == "__main__":
log.trace("1%s","hello")
log.debug("2%d",12)
try:
raise Exception("EXC")
except Exception as e:
log.info("3%s",e)
log.warn("1")
log.error("1")
#log.enableFileLog("$(scriptpath)test_$(Date8)_$(filenumber2).log")
log.trace("1")
log.debug("1")
log.info("1")
log.warn("1")
log.error("1")
log=NamedLog("test")
log.d("1")
log.i("1")
log.warn("1")
log.error("1=%d,%s",100,e)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys,os
import datetime
import threading
import pypibot.assistant as assistant
import platform
if assistant.is_python3():
import _thread
else:
import thread
import traceback
"""
%a Locales abbreviated weekday name.
%A Locales full weekday name.
%b Locales abbreviated month name.
%B Locales full month name.
%c Locales appropriate date and time representation.
%d Day of the month as a decimal number [01,31].
%H Hour (24-hour clock) as a decimal number [00,23].
%I Hour (12-hour clock) as a decimal number [01,12].
%j Day of the year as a decimal number [001,366].
%m Month as a decimal number [01,12].
%M Minute as a decimal number [00,59].
%p Locales equivalent of either AM or PM. (1)
%S Second as a decimal number [00,61]. (2)
%U Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Sunday are considered to be in week 0. (3)
%w Weekday as a decimal number [0(Sunday),6].
%W Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Monday are considered to be in week 0. (3)
%x Locales appropriate date representation.
%X Locales appropriate time representation.
%y Year without century as a decimal number [00,99].
%Y Year with century as a decimal number.
%Z Time zone name (no characters if no time zone exists).
%% A literal '%' character.
"""
isWindows=False
if platform.system()=='Windows':
isWindows=True
defaultEncodeing="utf8"
if "-utf8" in sys.argv:
defaultEncodeing="utf-8"
if "-gbk" in sys.argv:
defaultEncodeing="gbk"
TRACE=5
DEBUG=4
INFORMATION=3
WARNING=2
ERROR=1
NONE=0
MAX_MSG_SIZE = 4096
def getLevelFromString(level):
level=level.lower()
if level=='t' or level=='trace':return 5
elif level=='d' or level=='debug':return 4
elif level=='i' or level=='info':return 3
elif level=='w' or level=='wran':return 2
elif level=='e' or level=='error':return 1
else :return 0
def getLevelString(level):
if level==TRACE:return "T"
elif level==DEBUG:return "D"
elif level==INFORMATION:return "I"
elif level==WARNING:return "W"
elif level==ERROR:return "E"
else:return "N"
class PibotLog:
def __init__(self):
self.isEnableControlLog=True
self.fileTemple=None
self.filePath=""
self.level=5
self._lock=threading.RLock()
self.fd=None
self.fd_day=-1
def setLevel(self,level):
self.level=getLevelFromString(level)
def enableControllog(self,enable):
self.isEnableControlLog=enable
def enableFileLog(self,fileName):
self.fileTemple=fileName
self.updateFilelog()
def updateFilelog(self):
fn=assistant.SF(self.fileTemple)
if fn!=self.filePath:
self.i("new log file:%s",fn)
if self.fd:
self.fd.close()
self.fd=None
self.fd_day=-1
self.filePath=""
try:
path=os.path.dirname(fn)
if path!="" and not os.path.isdir(path):os.makedirs(path)
self.fd=open(fn,mode="w")
try:
linkfn = fn.split(".log.", 1)[0] + ".INFO"
if os.path.exists(linkfn):
os.remove(linkfn)
(filepath, tempfilename) = os.path.split(fn)
os.symlink(tempfilename, linkfn)
except:
pass
self.fd_day=datetime.datetime.now().day
self.filePath=fn
except Exception as e:
print("open file fail!file=%s,err=%s"%(fn,e))
def _output(self,level,msg,args):
if self.level<level:return
try:
if len(args)>0:
t=[]
for arg in args:
if isinstance(arg,Exception):
t.append(traceback.format_exc(arg).decode('utf-8'))
elif isinstance(arg,bytes) :
t.append(arg.decode('utf-8'))
else:
t.append(arg)
args=tuple(t)
try:
msg=msg%args
except:
try:
for arg in args:
msg=msg+str(arg)+" "
except Exception as e:
msg=msg+"==LOG ERROR==>%s"%(e)
if len(msg)>MAX_MSG_SIZE:msg=msg[0:MAX_MSG_SIZE]
now=datetime.datetime.now()
msg=msg+"\r\n"
if assistant.is_python3():
id = _thread.get_ident()
else:
id = thread.get_ident()
s="[%s] %04d-%02d-%02d %02d:%02d:%02d.%03d (0x%04X):%s"%(getLevelString(level),now.year,now.month,now.day,now.hour,now.minute,now.second,now.microsecond/1000,(id>>8)&0xffff,msg)
prefix=''
suffix=''
if not isWindows:
suffix='\033[0m'
if level==TRACE:
prefix='\033[0;37m'
elif level==DEBUG:
prefix='\033[1m'
elif level==INFORMATION:
prefix='\033[0;32m'
elif level==WARNING:
prefix='\033[0;33m'
elif level==ERROR:
prefix='\033[0;31m'
else:
pass
self._lock.acquire()
try:
if self.isEnableControlLog:
sys.stdout.write((prefix+s+suffix))
if self.fd:
if self.fd_day!=now.day:
self.updateFilelog()
if assistant.is_python3():
self.fd.write(s)
else:
self.fd.write(s.encode('utf-8'))
self.fd.flush()
finally:
self._lock.release()
except Exception as e:
if assistant.is_python3():
print(e)
else:
print("PibotLog._output crash!err=%s"%traceback.format_exc(e))
def trace(self,msg,*args):
self._output(TRACE,msg,args)
def t(self,msg,*args):
self._output(TRACE,msg,args)
def debug(self,msg,*args):
self._output(DEBUG, msg,args)
def d(self,msg,*args):
self._output(DEBUG, msg,args)
def info(self,msg,*args):
self._output(INFORMATION, msg,args)
def i(self,msg,*args):
self._output(INFORMATION, msg,args)
def warn(self,msg,*args):
self._output(WARNING, msg,args)
def w(self,msg,*args):
self._output(WARNING, msg,args)
def error(self,msg,*args):
self._output(ERROR, msg,args)
def e(self,msg,*args):
self._output(ERROR, msg,args)
def _log(self,level,msg,args):
self._output(level, msg,args)
def createNamedLog(self,name):
return NamedLog(name)
log=PibotLog()
class NamedLog:
def __init__(self,name=None):
global log
self.name=''
if name:
self.name='['+name+']'
self.log=log
def trace(self,msg,*args):
msg=self.name+msg
self.log._log(TRACE,msg,args)
def t(self,msg,*args):
msg=self.name+msg
self.log._log(TRACE,msg,args)
def debug(self,msg,*args):
msg=self.name+msg
self.log._log(DEBUG, msg,args)
def d(self,msg,*args):
msg=self.name+msg
self.log._log(DEBUG, msg,args)
def info(self,msg,*args):
msg=self.name+msg
self.log._log(INFORMATION, msg,args)
def i(self,msg,*args):
msg=self.name+msg
self.log._log(INFORMATION, msg,args)
def warn(self,msg,*args):
msg=self.name+msg
self.log._log(WARNING, msg,args)
def w(self,msg,*args):
msg=self.name+msg
self.log._log(WARNING, msg,args)
def error(self,msg,*args):
msg=self.name+msg
self.log._log(ERROR, msg,args)
def e(self,msg,*args):
msg=self.name+msg
self.log._log(ERROR, msg,args)
if __name__ == "__main__":
log.trace("1%s","hello")
log.debug("2%d",12)
try:
raise Exception("EXC")
except Exception as e:
log.info("3%s",e)
log.warn("1")
log.error("1")
#log.enableFileLog("$(scriptpath)test_$(Date8)_$(filenumber2).log")
log.trace("1")
log.debug("1")
log.info("1")
log.warn("1")
log.error("1")
log=NamedLog("test")
log.d("1")
log.i("1")
log.warn("1")
log.error("1=%d,%s",100,e)

View File

@ -1,240 +1,240 @@
import struct
params_size=29
# main board
class MessageID:
ID_GET_VERSION = 0
ID_SET_ROBOT_PARAMETER = 1
ID_GET_ROBOT_PARAMETER = 2
ID_INIT_ODOM = 3
ID_SET_VEL = 4
ID_GET_ODOM = 5
ID_GET_PID_DEBUG = 6
ID_GET_IMU = 7
ID_GET_ENCODER_COUNT = 8
ID_SET_MOTOR_PWM = 9
class RobotMessage:
def pack(self):
return b''
def unpack(self):
return True
class RobotFirmwareInfo(RobotMessage):
def __init__(self):
self.version = ''
self.build_time = ''
def unpack(self, data):
try:
upk = struct.unpack('16s16s', bytes(data))
except:
return False
[self.version, self.build_time] = upk
return True
class RobotImuType:
IMU_TYPE_GY65 = 49
IMU_TYPE_GY85 = 69
IMU_TYPE_GY87 = 71
class RobotModelType:
MODEL_TYPE_2WD_DIFF = 1
MODEL_TYPE_4WD_DIFF = 2
MODEL_TYPE_3WD_OMNI = 101
MODEL_TYPE_4WD_OMNI = 102
MODEL_TYPE_4WD_MECANUM = 201
class RobotParameters():
def __init__(self, wheel_diameter=0, \
wheel_track=0, \
encoder_resolution=0, \
do_pid_interval=0, \
kp=0, \
ki=0, \
kd=0, \
ko=0, \
cmd_last_time=0, \
max_v_liner_x=0, \
max_v_liner_y=0, \
max_v_angular_z=0, \
imu_type=0, \
motor_ratio=0, \
model_type=0, \
motor_nonexchange_flag=255, \
encoder_nonexchange_flag=255, \
):
self.wheel_diameter = wheel_diameter
self.wheel_track = wheel_track
self.encoder_resolution = encoder_resolution
self.do_pid_interval = do_pid_interval
self.kp = kp
self.ki = ki
self.kd = kd
self.ko = ko
self.cmd_last_time = cmd_last_time
self.max_v_liner_x = max_v_liner_x
self.max_v_liner_y = max_v_liner_y
self.max_v_angular_z = max_v_angular_z
self.imu_type = imu_type
self.motor_ratio = motor_ratio
self.model_type = model_type
self.motor_nonexchange_flag = motor_nonexchange_flag
self.encoder_nonexchange_flag = encoder_nonexchange_flag
reserve=b'\xff'
self.reserve=b''
for i in range(64-params_size):
self.reserve+=reserve
robotParam = RobotParameters()
class GetRobotParameters(RobotMessage):
def __init__(self):
self.param = robotParam
def unpack(self, data):
#print(bytes(data), len(bytes(data)))
upk = struct.unpack('<3H1B8H1B1H3B%ds'%(64-params_size), bytes(data))
[self.param.wheel_diameter,
self.param.wheel_track,
self.param.encoder_resolution,
self.param.do_pid_interval,
self.param.kp,
self.param.ki,
self.param.kd,
self.param.ko,
self.param.cmd_last_time,
self.param.max_v_liner_x,
self.param.max_v_liner_y,
self.param.max_v_angular_z,
self.param.imu_type,
self.param.motor_ratio,
self.param.model_type,
self.param.motor_nonexchange_flag,
self.param.encoder_nonexchange_flag,
self.param.reserve] = upk
return True
class SetRobotParameters(RobotMessage):
def __init__(self):
self.param = robotParam
def pack(self):
data = [self.param.wheel_diameter,
self.param.wheel_track,
self.param.encoder_resolution,
self.param.do_pid_interval,
self.param.kp,
self.param.ki,
self.param.kd,
self.param.ko,
self.param.cmd_last_time,
self.param.max_v_liner_x,
self.param.max_v_liner_y,
self.param.max_v_angular_z,
self.param.imu_type,
self.param.motor_ratio,
self.param.model_type,
self.param.motor_nonexchange_flag,
self.param.encoder_nonexchange_flag,
self.param.reserve]
print(data)
pk = struct.pack('<3H1B8H1B1H3B%ds'%(64-(3*2+1+8*2+1+2+3)), *data)
return pk
def unpack(self, data):
return True
class RobotVel(RobotMessage):
def __init__(self):
self.v_liner_x = 0
self.v_liner_y = 0
self.v_angular_z = 0
def pack(self):
data = [self.v_liner_x,
self.v_liner_y,
self.v_angular_z]
pk = struct.pack('3h', *data)
return pk
def unpack(self, data):
return True
#todo the rest of the message classes
class RobotOdom(RobotMessage):
def __init__(self):
self.v_liner_x = 0
self.v_liner_y = 0
self.v_angular_z = 0
self.x = 0
self.y = 0
self.yaw = 0
def unpack(self, data):
try:
upk = struct.unpack('<3H2l1H', bytes(data))
except:
return False
[self.v_liner_x,
self.v_liner_y,
self.v_angular_z,
self.x,
self.y,
self.yaw] = upk
return True
class RobotPIDData(RobotMessage):
pass
class RobotIMU(RobotMessage):
def __init__(self):
self.imu = [0]*9
def unpack(self, data):
try:
upk = struct.unpack('<9f', bytes(data))
except:
return False
self.imu = upk
return True
class RobotEncoderCount(RobotMessage):
def __init__(self):
self.encoder = [0]*4
def unpack(self, data):
try:
upk = struct.unpack('<4f', bytes(data))
except:
return False
self.encoder = upk
return True
class RobotMotorPWM(RobotMessage):
def __init__(self):
self.pwm = [0]*4
def pack(self):
pk = struct.pack('4h', *self.pwm)
return pk
def unpack(self, data):
return True
BoardDataDict = {MessageID.ID_GET_VERSION:RobotFirmwareInfo(),
MessageID.ID_GET_ROBOT_PARAMETER:GetRobotParameters(),
MessageID.ID_SET_ROBOT_PARAMETER:SetRobotParameters(),
MessageID.ID_SET_VEL:RobotVel(),
MessageID.ID_GET_ODOM:RobotOdom(),
MessageID.ID_GET_PID_DEBUG: RobotPIDData(),
MessageID.ID_GET_IMU: RobotIMU(),
MessageID.ID_GET_ENCODER_COUNT: RobotEncoderCount(),
MessageID.ID_SET_MOTOR_PWM: RobotMotorPWM(),
}
import struct
params_size=29
# main board
class MessageID:
ID_GET_VERSION = 0
ID_SET_ROBOT_PARAMETER = 1
ID_GET_ROBOT_PARAMETER = 2
ID_INIT_ODOM = 3
ID_SET_VEL = 4
ID_GET_ODOM = 5
ID_GET_PID_DEBUG = 6
ID_GET_IMU = 7
ID_GET_ENCODER_COUNT = 8
ID_SET_MOTOR_PWM = 9
class RobotMessage:
def pack(self):
return b''
def unpack(self):
return True
class RobotFirmwareInfo(RobotMessage):
def __init__(self):
self.version = ''
self.build_time = ''
def unpack(self, data):
try:
upk = struct.unpack('16s16s', bytes(data))
except:
return False
[self.version, self.build_time] = upk
return True
class RobotImuType:
IMU_TYPE_GY65 = 49
IMU_TYPE_GY85 = 69
IMU_TYPE_GY87 = 71
class RobotModelType:
MODEL_TYPE_2WD_DIFF = 1
MODEL_TYPE_4WD_DIFF = 2
MODEL_TYPE_3WD_OMNI = 101
MODEL_TYPE_4WD_OMNI = 102
MODEL_TYPE_4WD_MECANUM = 201
class RobotParameters():
def __init__(self, wheel_diameter=0, \
wheel_track=0, \
encoder_resolution=0, \
do_pid_interval=0, \
kp=0, \
ki=0, \
kd=0, \
ko=0, \
cmd_last_time=0, \
max_v_liner_x=0, \
max_v_liner_y=0, \
max_v_angular_z=0, \
imu_type=0, \
motor_ratio=0, \
model_type=0, \
motor_nonexchange_flag=255, \
encoder_nonexchange_flag=255, \
):
self.wheel_diameter = wheel_diameter
self.wheel_track = wheel_track
self.encoder_resolution = encoder_resolution
self.do_pid_interval = do_pid_interval
self.kp = kp
self.ki = ki
self.kd = kd
self.ko = ko
self.cmd_last_time = cmd_last_time
self.max_v_liner_x = max_v_liner_x
self.max_v_liner_y = max_v_liner_y
self.max_v_angular_z = max_v_angular_z
self.imu_type = imu_type
self.motor_ratio = motor_ratio
self.model_type = model_type
self.motor_nonexchange_flag = motor_nonexchange_flag
self.encoder_nonexchange_flag = encoder_nonexchange_flag
reserve=b'\xff'
self.reserve=b''
for i in range(64-params_size):
self.reserve+=reserve
robotParam = RobotParameters()
class GetRobotParameters(RobotMessage):
def __init__(self):
self.param = robotParam
def unpack(self, data):
#print(bytes(data), len(bytes(data)))
upk = struct.unpack('<3H1B8H1B1H3B%ds'%(64-params_size), bytes(data))
[self.param.wheel_diameter,
self.param.wheel_track,
self.param.encoder_resolution,
self.param.do_pid_interval,
self.param.kp,
self.param.ki,
self.param.kd,
self.param.ko,
self.param.cmd_last_time,
self.param.max_v_liner_x,
self.param.max_v_liner_y,
self.param.max_v_angular_z,
self.param.imu_type,
self.param.motor_ratio,
self.param.model_type,
self.param.motor_nonexchange_flag,
self.param.encoder_nonexchange_flag,
self.param.reserve] = upk
return True
class SetRobotParameters(RobotMessage):
def __init__(self):
self.param = robotParam
def pack(self):
data = [self.param.wheel_diameter,
self.param.wheel_track,
self.param.encoder_resolution,
self.param.do_pid_interval,
self.param.kp,
self.param.ki,
self.param.kd,
self.param.ko,
self.param.cmd_last_time,
self.param.max_v_liner_x,
self.param.max_v_liner_y,
self.param.max_v_angular_z,
self.param.imu_type,
self.param.motor_ratio,
self.param.model_type,
self.param.motor_nonexchange_flag,
self.param.encoder_nonexchange_flag,
self.param.reserve]
print(data)
pk = struct.pack('<3H1B8H1B1H3B%ds'%(64-(3*2+1+8*2+1+2+3)), *data)
return pk
def unpack(self, data):
return True
class RobotVel(RobotMessage):
def __init__(self):
self.v_liner_x = 0
self.v_liner_y = 0
self.v_angular_z = 0
def pack(self):
data = [self.v_liner_x,
self.v_liner_y,
self.v_angular_z]
pk = struct.pack('3h', *data)
return pk
def unpack(self, data):
return True
#todo the rest of the message classes
class RobotOdom(RobotMessage):
def __init__(self):
self.v_liner_x = 0
self.v_liner_y = 0
self.v_angular_z = 0
self.x = 0
self.y = 0
self.yaw = 0
def unpack(self, data):
try:
upk = struct.unpack('<3H2l1H', bytes(data))
except:
return False
[self.v_liner_x,
self.v_liner_y,
self.v_angular_z,
self.x,
self.y,
self.yaw] = upk
return True
class RobotPIDData(RobotMessage):
pass
class RobotIMU(RobotMessage):
def __init__(self):
self.imu = [0]*9
def unpack(self, data):
try:
upk = struct.unpack('<9f', bytes(data))
except:
return False
self.imu = upk
return True
class RobotEncoderCount(RobotMessage):
def __init__(self):
self.encoder = [0]*4
def unpack(self, data):
try:
upk = struct.unpack('<4f', bytes(data))
except:
return False
self.encoder = upk
return True
class RobotMotorPWM(RobotMessage):
def __init__(self):
self.pwm = [0]*4
def pack(self):
pk = struct.pack('4h', *self.pwm)
return pk
def unpack(self, data):
return True
BoardDataDict = {MessageID.ID_GET_VERSION:RobotFirmwareInfo(),
MessageID.ID_GET_ROBOT_PARAMETER:GetRobotParameters(),
MessageID.ID_SET_ROBOT_PARAMETER:SetRobotParameters(),
MessageID.ID_SET_VEL:RobotVel(),
MessageID.ID_GET_ODOM:RobotOdom(),
MessageID.ID_GET_PID_DEBUG: RobotPIDData(),
MessageID.ID_GET_IMU: RobotIMU(),
MessageID.ID_GET_ENCODER_COUNT: RobotEncoderCount(),
MessageID.ID_SET_MOTOR_PWM: RobotMotorPWM(),
}

View File

@ -1,115 +1,115 @@
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
import time
import signal
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
run_flag = True
def exit(signum, frame):
global run_flag
run_flag = False
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit)
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get params err\r\n')
quit(1)
log.info("****************get odom&imu*****************")
while run_flag:
robotOdom = DataHolder[MessageID.ID_GET_ODOM]
p = mboard.request(MessageID.ID_GET_ODOM)
if p:
log.info('request get odom success, vx=%d vy=%d vangular=%d, x=%d y=%d yaw=%d'%(robotOdom.v_liner_x, \
robotOdom.v_liner_y, \
robotOdom.v_angular_z, \
robotOdom.x, \
robotOdom.y, \
robotOdom.yaw))
else:
log.error('get odom err')
quit(1)
robotIMU = DataHolder[MessageID.ID_GET_IMU].imu
p = mboard.request(MessageID.ID_GET_IMU)
if p:
log.info('get imu success, imu=[%f %f %f %f %f %f %f %f %f]'%(robotIMU[0], robotIMU[1], robotIMU[2], \
robotIMU[3], robotIMU[4], robotIMU[5], \
robotIMU[6], robotIMU[7], robotIMU[8]))
else:
log.error('get imu err')
quit(1)
time.sleep(0.1)
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
import time
import signal
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
run_flag = True
def exit(signum, frame):
global run_flag
run_flag = False
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit)
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get params err\r\n')
quit(1)
log.info("****************get odom&imu*****************")
while run_flag:
robotOdom = DataHolder[MessageID.ID_GET_ODOM]
p = mboard.request(MessageID.ID_GET_ODOM)
if p:
log.info('request get odom success, vx=%d vy=%d vangular=%d, x=%d y=%d yaw=%d'%(robotOdom.v_liner_x, \
robotOdom.v_liner_y, \
robotOdom.v_angular_z, \
robotOdom.x, \
robotOdom.y, \
robotOdom.yaw))
else:
log.error('get odom err')
quit(1)
robotIMU = DataHolder[MessageID.ID_GET_IMU].imu
p = mboard.request(MessageID.ID_GET_IMU)
if p:
log.info('get imu success, imu=[%f %f %f %f %f %f %f %f %f]'%(robotIMU[0], robotIMU[1], robotIMU[2], \
robotIMU[3], robotIMU[4], robotIMU[5], \
robotIMU[6], robotIMU[7], robotIMU[8]))
else:
log.error('get imu err')
quit(1)
time.sleep(0.1)

View File

@ -1,75 +1,75 @@
import dataholder
import os
from dataholder import RobotImuType
from dataholder import RobotModelType
pibotModel = os.environ['PIBOT_MODEL']
boardType = os.environ['PIBOT_BOARD']
pibotBaud = os.environ['PIBOT_DRIVER_BAUDRATE']
print(pibotModel)
print(boardType)
print(pibotBaud)
pibotParam = dataholder.RobotParameters()
if pibotModel == "apollo" and boardType == "arduino":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
75, 2500, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY85, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apollo" and boardType == "stm32f1":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
320, 2700, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apollo" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
320, 2700, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "zeus" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(58, 230, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_3WD_OMNI)
elif pibotModel == "hades" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(76, 470, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_4WD_MECANUM)
elif pibotModel == "hadesX" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(150, 565, 44, 10, \
250, 2750, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 72, \
RobotModelType.MODEL_TYPE_4WD_MECANUM)
elif pibotModel == "hera" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(82, 338, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_4WD_DIFF)
elif pibotModel == "apolloX" and boardType == "arduino":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
75, 2500, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY85, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apolloX" and boardType == "stm32f1":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
250, 1200, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apolloX" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
250, 1200, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
import dataholder
import os
from dataholder import RobotImuType
from dataholder import RobotModelType
pibotModel = os.environ['PIBOT_MODEL']
boardType = os.environ['PIBOT_BOARD']
pibotBaud = os.environ['PIBOT_DRIVER_BAUDRATE']
print(pibotModel)
print(boardType)
print(pibotBaud)
pibotParam = dataholder.RobotParameters()
if pibotModel == "apollo" and boardType == "arduino":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
75, 2500, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY85, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apollo" and boardType == "stm32f1":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
320, 2700, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apollo" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(65, 175, 44, 10, \
320, 2700, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "zeus" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(58, 230, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_3WD_OMNI)
elif pibotModel == "hades" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(76, 470, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_4WD_MECANUM)
elif pibotModel == "hadesX" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(150, 565, 44, 10, \
250, 2750, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 72, \
RobotModelType.MODEL_TYPE_4WD_MECANUM)
elif pibotModel == "hera" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(82, 338, 44, 10, \
320, 2700, 0, 10, \
250, 50, 50, 250, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_4WD_DIFF)
elif pibotModel == "apolloX" and boardType == "arduino":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
75, 2500, 0, 10, \
250, 40, 0, 200, \
RobotImuType.IMU_TYPE_GY85, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apolloX" and boardType == "stm32f1":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
250, 1200, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)
elif pibotModel == "apolloX" and boardType == "stm32f4":
pibotParam = dataholder.RobotParameters(96, 350, 68, 10, \
250, 1200, 0, 10, \
250, 50, 0, 200, \
RobotImuType.IMU_TYPE_GY87, 90, \
RobotModelType.MODEL_TYPE_2WD_DIFF)

View File

@ -1,90 +1,90 @@
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
if __name__ == '__main__':
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# set robot parameter
log.info("****************set robot parameter*****************")
DataHolder[MessageID.ID_SET_ROBOT_PARAMETER].param = params.pibotParam
p = mboard.request(MessageID.ID_SET_ROBOT_PARAMETER)
if p:
log.info('set parameter success')
else:
log.error('set parameter err')
quit(1)
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get param err\r\n')
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
if __name__ == '__main__':
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# set robot parameter
log.info("****************set robot parameter*****************")
DataHolder[MessageID.ID_SET_ROBOT_PARAMETER].param = params.pibotParam
p = mboard.request(MessageID.ID_SET_ROBOT_PARAMETER)
if p:
log.info('set parameter success')
else:
log.error('set parameter err')
quit(1)
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get param err\r\n')
quit(1)

View File

@ -1,122 +1,122 @@
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
import signal
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
run_flag = True
def exit(signum, frame):
global run_flag
run_flag = False
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit)
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
pwm=[0]*4
for i in range(len(sys.argv)-1):
pwm[i] = int(sys.argv[i+1])
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get params err\r\n')
quit(1)
DataHolder[MessageID.ID_SET_MOTOR_PWM].pwm = pwm
p = mboard.request(MessageID.ID_SET_MOTOR_PWM)
if p:
log.info('set pwm success')
else:
log.error('set pwm err')
quit(1)
log.info("****************get encoder count*****************")
while run_flag:
robotEncoder = DataHolder[MessageID.ID_GET_ENCODER_COUNT].encoder
p = mboard.request(MessageID.ID_GET_ENCODER_COUNT)
if p:
log.info('encoder count: %s'%(('\t\t').join([str(x) for x in robotEncoder])))
else:
log.error('get encoder count err')
quit(1)
import time
time.sleep(0.5)
DataHolder[MessageID.ID_SET_MOTOR_PWM].pwm = [0]*4
p = mboard.request(MessageID.ID_SET_MOTOR_PWM)
if p:
log.info('set pwm success')
else:
log.error('set pwm err')
import platform
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from transport import Transport
from dataholder import MessageID
import params
import signal
#for linux
port="/dev/pibot"
#for windows
#port="com3"
pypibot.assistant.enableGlobalExcept()
#log.enableFileLog(log_dir + "ros_$(Date8)_$(filenumber2).log")
log.setLevel("i")
run_flag = True
def exit(signum, frame):
global run_flag
run_flag = False
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit)
mboard = Transport(port, params.pibotBaud)
if not mboard.start():
log.error("can not open %s"%port)
sys.exit()
pwm=[0]*4
for i in range(len(sys.argv)-1):
pwm[i] = int(sys.argv[i+1])
DataHolder = mboard.getDataHolder()
for num in range(0,3):
log.info("****************get robot version*****************")
boardVersion = DataHolder[MessageID.ID_GET_VERSION]
p = mboard.request(MessageID.ID_GET_VERSION)
if p:
log.info("firmware version:%s buildtime:%s\r\n"%(boardVersion.version.decode(), boardVersion.build_time.decode()))
break
else:
log.error('read firmware version err\r\n')
import time
time.sleep(1)
if num == 2:
log.error('please check connection or baudrate\r\n')
sys.exit()
# get robot parameter
robotParam = DataHolder[MessageID.ID_GET_ROBOT_PARAMETER]
p = mboard.request(MessageID.ID_GET_ROBOT_PARAMETER)
if p:
log.info("model_type:%d wheel_diameter:%d wheel_track:%d encoder_resolution:%d" \
%(robotParam.param.model_type, \
robotParam.param.wheel_diameter, \
robotParam.param.wheel_track, \
robotParam.param.encoder_resolution
))
log.info("do_pid_interval:%d kp:%d ki:%d kd:%d ko:%d" \
%(robotParam.param.do_pid_interval, \
robotParam.param.kp, \
robotParam.param.ki, \
robotParam.param.kd, \
robotParam.param.ko))
log.info("cmd_last_time:%d imu_type:%d" \
%(robotParam.param.cmd_last_time,\
robotParam.param.imu_type
))
log.info("max_v:%d %d %d\r\n" \
%(robotParam.param.max_v_liner_x,\
robotParam.param.max_v_liner_y, \
robotParam.param.max_v_angular_z
))
log.info("motor flag:%d encoder flag: %d\r\n" \
%(robotParam.param.motor_nonexchange_flag,\
robotParam.param.encoder_nonexchange_flag
))
else:
log.error('get params err\r\n')
quit(1)
DataHolder[MessageID.ID_SET_MOTOR_PWM].pwm = pwm
p = mboard.request(MessageID.ID_SET_MOTOR_PWM)
if p:
log.info('set pwm success')
else:
log.error('set pwm err')
quit(1)
log.info("****************get encoder count*****************")
while run_flag:
robotEncoder = DataHolder[MessageID.ID_GET_ENCODER_COUNT].encoder
p = mboard.request(MessageID.ID_GET_ENCODER_COUNT)
if p:
log.info('encoder count: %s'%(('\t\t').join([str(x) for x in robotEncoder])))
else:
log.error('get encoder count err')
quit(1)
import time
time.sleep(0.5)
DataHolder[MessageID.ID_SET_MOTOR_PWM].pwm = [0]*4
p = mboard.request(MessageID.ID_SET_MOTOR_PWM)
if p:
log.info('set pwm success')
else:
log.error('set pwm err')
quit(1)

View File

@ -1,197 +1,197 @@
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from pypibot import assistant
import serial
import threading
import struct
import time
from dataholder import MessageID, BoardDataDict
FIX_HEAD = 0x5a
class Recstate():
WAITING_HD = 0
WAITING_MSG_ID = 1
RECEIVE_LEN = 2
RECEIVE_PACKAGE = 3
RECEIVE_CHECK = 4
def checksum(d):
sum = 0
if assistant.is_python3():
for i in d:
sum += i
sum = sum&0xff
else:
for i in d:
sum += ord(i)
sum = sum&0xff
return sum
class Transport:
def __init__(self, port, baudrate=921600):
self._Port = port
self._Baudrate = baudrate
self._KeepRunning = False
self.receive_state = Recstate.WAITING_HD
self.rev_msg = []
self.rev_data = []
self.wait_event = threading.Event()
def getDataHolder(self):
return BoardDataDict
def start(self):
try:
self._Serial = serial.Serial(port=self._Port, baudrate=self._Baudrate, timeout=0.2)
self._KeepRunning = True
self._ReceiverThread = threading.Thread(target=self._Listen)
self._ReceiverThread.setDaemon(True)
self._ReceiverThread.start()
return True
except:
return False
def Stop(self):
self._KeepRunning = False
time.sleep(0.1)
self._Serial.close()
def _Listen(self):
while self._KeepRunning:
if self.receiveFiniteStates(self._Serial.read()):
self.packageAnalysis()
def receiveFiniteStates(self, s):
if len(s) == 0:
return False
val = s[0]
val_int = val
if not assistant.is_python3():
val_int = ord(val)
if self.receive_state == Recstate.WAITING_HD:
if val_int == FIX_HEAD:
log.trace('got head')
self.rev_msg = []
self.rev_data =[]
self.rev_msg.append(val)
self.receive_state = Recstate.WAITING_MSG_ID
elif self.receive_state == Recstate.WAITING_MSG_ID:
log.trace('got msg id')
self.rev_msg.append(val)
self.receive_state = Recstate.RECEIVE_LEN
elif self.receive_state == Recstate.RECEIVE_LEN:
log.trace('got len:%d', val_int)
self.rev_msg.append(val)
if val_int == 0:
self.receive_state = Recstate.RECEIVE_CHECK
else:
self.receive_state = Recstate.RECEIVE_PACKAGE
elif self.receive_state == Recstate.RECEIVE_PACKAGE:
# if assistant.is_python3():
# self.rev_data.append((chr(val)).encode('latin1'))
# else:
self.rev_data.append(val)
r = False
if assistant.is_python3():
r = len(self.rev_data) == int(self.rev_msg[-1])
else:
r = len(self.rev_data) == ord(self.rev_msg[-1])
if r:
self.rev_msg.extend(self.rev_data)
self.receive_state = Recstate.RECEIVE_CHECK
elif self.receive_state == Recstate.RECEIVE_CHECK:
log.trace('got check')
self.receive_state = Recstate.WAITING_HD
if val_int == checksum(self.rev_msg):
log.trace('got a complete message')
return True
else:
self.receive_state = Recstate.WAITING_HD
# continue receiving
return False
def packageAnalysis(self):
if assistant.is_python3():
in_msg_id = int(self.rev_msg[1])
else:
in_msg_id = ord(self.rev_msg[1])
if assistant.is_python3():
log.debug("recv body:" + " ".join("{:02x}".format(c) for c in self.rev_data))
r = BoardDataDict[in_msg_id].unpack(bytes(self.rev_data))
else:
log.debug("recv body:" + " ".join("{:02x}".format(ord(c)) for c in self.rev_data))
r = BoardDataDict[in_msg_id].unpack(''.join(self.rev_data))
if r:
self.res_id = in_msg_id
if in_msg_id<100:
self.set_response()
else:#notify
log.debug('msg %d'%self.rev_msg[4],'data incoming')
pass
else:
log.debug ('error unpacking pkg')
def request(self, id, timeout=0.5):
if not self.write(id):
log.debug ('Serial send error!')
return False
if self.wait_for_response(timeout):
if id == self.res_id:
log.trace ('OK')
else:
log.error ('Got unmatched response!')
else:
log.error ('Request got no response!')
return False
# clear response
self.res_id = None
return True
def write(self, id):
cmd = self.make_command(id)
if assistant.is_python3():
log.d("write:" + " ".join("{:02x}".format(c) for c in cmd))
else:
log.d("write:" + " ".join("{:02x}".format(ord(c)) for c in cmd))
self._Serial.write(cmd)
return True
def wait_for_response(self, timeout):
self.wait_event.clear()
return self.wait_event.wait(timeout)
def set_response(self):
self.wait_event.set()
def make_command(self, id):
#print(DataDict[id])
data = BoardDataDict[id].pack()
l = [FIX_HEAD, id, len(data)]
head = struct.pack("3B", *l)
body = head + data
if assistant.is_python3():
return body + chr(checksum(body)).encode('latin1')
else:
return body + chr(checksum(body))
if __name__ == '__main__':
mboard = Transport('com10')
if not mboard.start():
import sys
sys.exit()
p = mboard.request(MessageID.ID_GET_VERSION)
log.i("result=%s"%p)
print('Version =[',mboard.getDataHolder()[MessageID.ID_GET_VERSION].version.decode(), mboard.getDataHolder()[MessageID.ID_GET_VERSION].build_time.decode(),"]\r\n")
import sys
sys.path.append("..")
import pypibot
from pypibot import log
from pypibot import assistant
import serial
import threading
import struct
import time
from dataholder import MessageID, BoardDataDict
FIX_HEAD = 0x5a
class Recstate():
WAITING_HD = 0
WAITING_MSG_ID = 1
RECEIVE_LEN = 2
RECEIVE_PACKAGE = 3
RECEIVE_CHECK = 4
def checksum(d):
sum = 0
if assistant.is_python3():
for i in d:
sum += i
sum = sum&0xff
else:
for i in d:
sum += ord(i)
sum = sum&0xff
return sum
class Transport:
def __init__(self, port, baudrate=921600):
self._Port = port
self._Baudrate = baudrate
self._KeepRunning = False
self.receive_state = Recstate.WAITING_HD
self.rev_msg = []
self.rev_data = []
self.wait_event = threading.Event()
def getDataHolder(self):
return BoardDataDict
def start(self):
try:
self._Serial = serial.Serial(port=self._Port, baudrate=self._Baudrate, timeout=0.2)
self._KeepRunning = True
self._ReceiverThread = threading.Thread(target=self._Listen)
self._ReceiverThread.setDaemon(True)
self._ReceiverThread.start()
return True
except:
return False
def Stop(self):
self._KeepRunning = False
time.sleep(0.1)
self._Serial.close()
def _Listen(self):
while self._KeepRunning:
if self.receiveFiniteStates(self._Serial.read()):
self.packageAnalysis()
def receiveFiniteStates(self, s):
if len(s) == 0:
return False
val = s[0]
val_int = val
if not assistant.is_python3():
val_int = ord(val)
if self.receive_state == Recstate.WAITING_HD:
if val_int == FIX_HEAD:
log.trace('got head')
self.rev_msg = []
self.rev_data =[]
self.rev_msg.append(val)
self.receive_state = Recstate.WAITING_MSG_ID
elif self.receive_state == Recstate.WAITING_MSG_ID:
log.trace('got msg id')
self.rev_msg.append(val)
self.receive_state = Recstate.RECEIVE_LEN
elif self.receive_state == Recstate.RECEIVE_LEN:
log.trace('got len:%d', val_int)
self.rev_msg.append(val)
if val_int == 0:
self.receive_state = Recstate.RECEIVE_CHECK
else:
self.receive_state = Recstate.RECEIVE_PACKAGE
elif self.receive_state == Recstate.RECEIVE_PACKAGE:
# if assistant.is_python3():
# self.rev_data.append((chr(val)).encode('latin1'))
# else:
self.rev_data.append(val)
r = False
if assistant.is_python3():
r = len(self.rev_data) == int(self.rev_msg[-1])
else:
r = len(self.rev_data) == ord(self.rev_msg[-1])
if r:
self.rev_msg.extend(self.rev_data)
self.receive_state = Recstate.RECEIVE_CHECK
elif self.receive_state == Recstate.RECEIVE_CHECK:
log.trace('got check')
self.receive_state = Recstate.WAITING_HD
if val_int == checksum(self.rev_msg):
log.trace('got a complete message')
return True
else:
self.receive_state = Recstate.WAITING_HD
# continue receiving
return False
def packageAnalysis(self):
if assistant.is_python3():
in_msg_id = int(self.rev_msg[1])
else:
in_msg_id = ord(self.rev_msg[1])
if assistant.is_python3():
log.debug("recv body:" + " ".join("{:02x}".format(c) for c in self.rev_data))
r = BoardDataDict[in_msg_id].unpack(bytes(self.rev_data))
else:
log.debug("recv body:" + " ".join("{:02x}".format(ord(c)) for c in self.rev_data))
r = BoardDataDict[in_msg_id].unpack(''.join(self.rev_data))
if r:
self.res_id = in_msg_id
if in_msg_id<100:
self.set_response()
else:#notify
log.debug('msg %d'%self.rev_msg[4],'data incoming')
pass
else:
log.debug ('error unpacking pkg')
def request(self, id, timeout=0.5):
if not self.write(id):
log.debug ('Serial send error!')
return False
if self.wait_for_response(timeout):
if id == self.res_id:
log.trace ('OK')
else:
log.error ('Got unmatched response!')
else:
log.error ('Request got no response!')
return False
# clear response
self.res_id = None
return True
def write(self, id):
cmd = self.make_command(id)
if assistant.is_python3():
log.d("write:" + " ".join("{:02x}".format(c) for c in cmd))
else:
log.d("write:" + " ".join("{:02x}".format(ord(c)) for c in cmd))
self._Serial.write(cmd)
return True
def wait_for_response(self, timeout):
self.wait_event.clear()
return self.wait_event.wait(timeout)
def set_response(self):
self.wait_event.set()
def make_command(self, id):
#print(DataDict[id])
data = BoardDataDict[id].pack()
l = [FIX_HEAD, id, len(data)]
head = struct.pack("3B", *l)
body = head + data
if assistant.is_python3():
return body + chr(checksum(body)).encode('latin1')
else:
return body + chr(checksum(body))
if __name__ == '__main__':
mboard = Transport('com10')
if not mboard.start():
import sys
sys.exit()
p = mboard.request(MessageID.ID_GET_VERSION)
log.i("result=%s"%p)
print('Version =[',mboard.getDataHolder()[MessageID.ID_GET_VERSION].version.decode(), mboard.getDataHolder()[MessageID.ID_GET_VERSION].build_time.decode(),"]\r\n")

View File

@ -0,0 +1,6 @@
## ROS包功能说明:
> 更新时间2024/02/04@詹力
`ipa_coverage_planning` :新添加的全覆盖路径规划算法,这部分代码是原来`pibot`没有的。详细内容见包里面的`README.md`文档。

Some files were not shown because too many files have changed in this diff Show More