300 lines
13 KiB
Python
300 lines
13 KiB
Python
import time
|
|
import sys
|
|
import openvr
|
|
import math
|
|
import json
|
|
|
|
from functools import lru_cache
|
|
|
|
# Function to print out text but instead of starting a new line it will overwrite the existing line
|
|
def update_text(txt):
|
|
sys.stdout.write('\r'+txt)
|
|
sys.stdout.flush()
|
|
|
|
#Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Euler angles (in degrees)
|
|
def convert_to_euler(pose_mat):
|
|
yaw = 180 / math.pi * math.atan2(pose_mat[1][0], pose_mat[0][0])
|
|
pitch = 180 / math.pi * math.atan2(pose_mat[2][0], pose_mat[0][0])
|
|
roll = 180 / math.pi * math.atan2(pose_mat[2][1], pose_mat[2][2])
|
|
x = pose_mat[0][3]
|
|
y = pose_mat[1][3]
|
|
z = pose_mat[2][3]
|
|
return [x,y,z,yaw,pitch,roll]
|
|
|
|
#Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Quaternion
|
|
def convert_to_quaternion(pose_mat):
|
|
# Per issue #2, adding a abs() so that sqrt only results in real numbers
|
|
r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2
|
|
r_x = (pose_mat[2][1]-pose_mat[1][2])/(4*r_w)
|
|
r_y = (pose_mat[0][2]-pose_mat[2][0])/(4*r_w)
|
|
r_z = (pose_mat[1][0]-pose_mat[0][1])/(4*r_w)
|
|
|
|
x = pose_mat[0][3]
|
|
y = pose_mat[1][3]
|
|
z = pose_mat[2][3]
|
|
return [x,y,z,r_w,r_x,r_y,r_z]
|
|
|
|
#Define a class to make it easy to append pose matricies and convert to both Euler and Quaternion for plotting
|
|
class pose_sample_buffer():
|
|
def __init__(self):
|
|
self.i = 0
|
|
self.index = []
|
|
self.time = []
|
|
self.x = []
|
|
self.y = []
|
|
self.z = []
|
|
self.yaw = []
|
|
self.pitch = []
|
|
self.roll = []
|
|
self.r_w = []
|
|
self.r_x = []
|
|
self.r_y = []
|
|
self.r_z = []
|
|
|
|
def append(self,pose_mat,t):
|
|
self.time.append(t)
|
|
self.x.append(pose_mat[0][3])
|
|
self.y.append(pose_mat[1][3])
|
|
self.z.append(pose_mat[2][3])
|
|
self.yaw.append(180 / math.pi * math.atan(pose_mat[1][0] /pose_mat[0][0]))
|
|
self.pitch.append(180 / math.pi * math.atan(-1 * pose_mat[2][0] / math.sqrt(pow(pose_mat[2][1], 2) + math.pow(pose_mat[2][2], 2))))
|
|
self.roll.append(180 / math.pi * math.atan(pose_mat[2][1] /pose_mat[2][2]))
|
|
r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2
|
|
self.r_w.append(r_w)
|
|
self.r_x.append((pose_mat[2][1]-pose_mat[1][2])/(4*r_w))
|
|
self.r_y.append((pose_mat[0][2]-pose_mat[2][0])/(4*r_w))
|
|
self.r_z.append((pose_mat[1][0]-pose_mat[0][1])/(4*r_w))
|
|
|
|
def get_pose(vr_obj):
|
|
return vr_obj.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0, openvr.k_unMaxTrackedDeviceCount)
|
|
|
|
|
|
class vr_tracked_device():
|
|
def __init__(self,vr_obj,index,device_class):
|
|
self.device_class = device_class
|
|
self.index = index
|
|
self.vr = vr_obj
|
|
|
|
@lru_cache(maxsize=None)
|
|
def get_serial(self):
|
|
return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_SerialNumber_String)
|
|
|
|
def get_model(self):
|
|
return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_ModelNumber_String)
|
|
|
|
def get_battery_percent(self):
|
|
return self.vr.getFloatTrackedDeviceProperty(self.index, openvr.Prop_DeviceBatteryPercentage_Float)
|
|
|
|
def is_charging(self):
|
|
return self.vr.getBoolTrackedDeviceProperty(self.index, openvr.Prop_DeviceIsCharging_Bool)
|
|
|
|
|
|
def sample(self,num_samples,sample_rate):
|
|
interval = 1/sample_rate
|
|
rtn = pose_sample_buffer()
|
|
sample_start = time.time()
|
|
for i in range(num_samples):
|
|
start = time.time()
|
|
pose = get_pose(self.vr)
|
|
rtn.append(pose[self.index].mDeviceToAbsoluteTracking,time.time()-sample_start)
|
|
sleep_time = interval- (time.time()-start)
|
|
if sleep_time>0:
|
|
time.sleep(sleep_time)
|
|
return rtn
|
|
|
|
def get_pose_euler(self, pose=None):
|
|
if pose == None:
|
|
pose = get_pose(self.vr)
|
|
if pose[self.index].bPoseIsValid:
|
|
return convert_to_euler(pose[self.index].mDeviceToAbsoluteTracking)
|
|
else:
|
|
return None
|
|
|
|
def get_pose_matrix(self, pose=None):
|
|
if pose == None:
|
|
pose = get_pose(self.vr)
|
|
if pose[self.index].bPoseIsValid:
|
|
return pose[self.index].mDeviceToAbsoluteTracking
|
|
else:
|
|
return None
|
|
|
|
def get_velocity(self, pose=None):
|
|
if pose == None:
|
|
pose = get_pose(self.vr)
|
|
if pose[self.index].bPoseIsValid:
|
|
return pose[self.index].vVelocity
|
|
else:
|
|
return None
|
|
|
|
def get_angular_velocity(self, pose=None):
|
|
if pose == None:
|
|
pose = get_pose(self.vr)
|
|
if pose[self.index].bPoseIsValid:
|
|
return pose[self.index].vAngularVelocity
|
|
else:
|
|
return None
|
|
|
|
def get_pose_quaternion(self, pose=None):
|
|
if pose == None:
|
|
pose = get_pose(self.vr)
|
|
if pose[self.index].bPoseIsValid:
|
|
return convert_to_quaternion(pose[self.index].mDeviceToAbsoluteTracking)
|
|
else:
|
|
return None
|
|
|
|
def controller_state_to_dict(self, pControllerState):
|
|
# This function is graciously borrowed from https://gist.github.com/awesomebytes/75daab3adb62b331f21ecf3a03b3ab46
|
|
# docs: https://github.com/ValveSoftware/openvr/wiki/IVRSystem::GetControllerState
|
|
d = {}
|
|
d['unPacketNum'] = pControllerState.unPacketNum
|
|
# on trigger .y is always 0.0 says the docs
|
|
d['trigger'] = pControllerState.rAxis[1].x
|
|
# 0.0 on trigger is fully released
|
|
# -1.0 to 1.0 on joystick and trackpads
|
|
d['trackpad_x'] = pControllerState.rAxis[0].x
|
|
d['trackpad_y'] = pControllerState.rAxis[0].y
|
|
# These are published and always 0.0
|
|
# for i in range(2, 5):
|
|
# d['unknowns_' + str(i) + '_x'] = pControllerState.rAxis[i].x
|
|
# d['unknowns_' + str(i) + '_y'] = pControllerState.rAxis[i].y
|
|
d['ulButtonPressed'] = pControllerState.ulButtonPressed
|
|
d['ulButtonTouched'] = pControllerState.ulButtonTouched
|
|
# To make easier to understand what is going on
|
|
# Second bit marks menu button
|
|
d['menu_button'] = bool(pControllerState.ulButtonPressed >> 1 & 1)
|
|
# 32 bit marks trackpad
|
|
d['trackpad_pressed'] = bool(pControllerState.ulButtonPressed >> 32 & 1)
|
|
d['trackpad_touched'] = bool(pControllerState.ulButtonTouched >> 32 & 1)
|
|
# third bit marks grip button
|
|
d['grip_button'] = bool(pControllerState.ulButtonPressed >> 2 & 1)
|
|
# System button can't be read, if you press it
|
|
# the controllers stop reporting
|
|
return d
|
|
|
|
def get_controller_inputs(self):
|
|
result, state = self.vr.getControllerState(self.index)
|
|
return self.controller_state_to_dict(state)
|
|
|
|
def trigger_haptic_pulse(self, duration_micros=1000, axis_id=0):
|
|
"""
|
|
Causes devices with haptic feedback to vibrate for a short time.
|
|
"""
|
|
self.vr.triggerHapticPulse(self.index ,axis_id, duration_micros)
|
|
|
|
class vr_tracking_reference(vr_tracked_device):
|
|
def get_mode(self):
|
|
return self.vr.getStringTrackedDeviceProperty(self.index,openvr.Prop_ModeLabel_String).decode('utf-8').upper()
|
|
def sample(self,num_samples,sample_rate):
|
|
print("Warning: Tracking References do not move, sample isn't much use...")
|
|
|
|
class triad_openvr():
|
|
def __init__(self, configfile_path=None):
|
|
# Initialize OpenVR in the
|
|
self.vr = openvr.init(openvr.VRApplication_Other)
|
|
self.vrsystem = openvr.VRSystem()
|
|
|
|
# Initializing object to hold indexes for various tracked objects
|
|
self.object_names = {"Tracking Reference":[],"HMD":[],"Controller":[],"Tracker":[]}
|
|
self.devices = {}
|
|
self.device_index_map = {}
|
|
poses = self.vr.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0,
|
|
openvr.k_unMaxTrackedDeviceCount)
|
|
|
|
# Loading config file
|
|
if configfile_path:
|
|
try:
|
|
with open(configfile_path, 'r') as json_data:
|
|
config = json.load(json_data)
|
|
except EnvironmentError: # parent of IOError, OSError *and* WindowsError where available
|
|
print('config.json not found.')
|
|
exit(1)
|
|
|
|
# Iterate through the pose list to find the active devices and determine their type
|
|
for i in range(openvr.k_unMaxTrackedDeviceCount):
|
|
if poses[i].bDeviceIsConnected:
|
|
device_serial = self.vr.getStringTrackedDeviceProperty(i,openvr.Prop_SerialNumber_String).decode('utf-8')
|
|
for device in config['devices']:
|
|
if device_serial == device['serial']:
|
|
device_name = device['name']
|
|
self.object_names[device['type']].append(device_name)
|
|
self.devices[device_name] = vr_tracked_device(self.vr,i,device['type'])
|
|
else:
|
|
# Iterate through the pose list to find the active devices and determine their type
|
|
for i in range(openvr.k_unMaxTrackedDeviceCount):
|
|
if poses[i].bDeviceIsConnected:
|
|
self.add_tracked_device(i)
|
|
|
|
def __del__(self):
|
|
openvr.shutdown()
|
|
|
|
def get_pose(self):
|
|
return get_pose(self.vr)
|
|
|
|
def poll_vr_events(self):
|
|
"""
|
|
Used to poll VR events and find any new tracked devices or ones that are no longer tracked.
|
|
"""
|
|
event = openvr.VREvent_t()
|
|
while self.vrsystem.pollNextEvent(event):
|
|
if event.eventType == openvr.VREvent_TrackedDeviceActivated:
|
|
self.add_tracked_device(event.trackedDeviceIndex)
|
|
elif event.eventType == openvr.VREvent_TrackedDeviceDeactivated:
|
|
#If we were already tracking this device, quit tracking it.
|
|
if event.trackedDeviceIndex in self.device_index_map:
|
|
self.remove_tracked_device(event.trackedDeviceIndex)
|
|
|
|
def add_tracked_device(self, tracked_device_index):
|
|
i = tracked_device_index
|
|
device_class = self.vr.getTrackedDeviceClass(i)
|
|
if (device_class == openvr.TrackedDeviceClass_Controller):
|
|
device_name = "controller_"+str(len(self.object_names["Controller"])+1)
|
|
self.object_names["Controller"].append(device_name)
|
|
self.devices[device_name] = vr_tracked_device(self.vr,i,"Controller")
|
|
self.device_index_map[i] = device_name
|
|
elif (device_class == openvr.TrackedDeviceClass_HMD):
|
|
device_name = "hmd_"+str(len(self.object_names["HMD"])+1)
|
|
self.object_names["HMD"].append(device_name)
|
|
self.devices[device_name] = vr_tracked_device(self.vr,i,"HMD")
|
|
self.device_index_map[i] = device_name
|
|
elif (device_class == openvr.TrackedDeviceClass_GenericTracker):
|
|
device_name = "tracker_"+str(len(self.object_names["Tracker"])+1)
|
|
self.object_names["Tracker"].append(device_name)
|
|
self.devices[device_name] = vr_tracked_device(self.vr,i,"Tracker")
|
|
self.device_index_map[i] = device_name
|
|
elif (device_class == openvr.TrackedDeviceClass_TrackingReference):
|
|
device_name = "tracking_reference_"+str(len(self.object_names["Tracking Reference"])+1)
|
|
self.object_names["Tracking Reference"].append(device_name)
|
|
self.devices[device_name] = vr_tracking_reference(self.vr,i,"Tracking Reference")
|
|
self.device_index_map[i] = device_name
|
|
|
|
def remove_tracked_device(self, tracked_device_index):
|
|
if tracked_device_index in self.device_index_map:
|
|
device_name = self.device_index_map[tracked_device_index]
|
|
self.object_names[self.devices[device_name].device_class].remove(device_name)
|
|
del self.device_index_map[tracked_device_index]
|
|
del self.devices[device_name]
|
|
else:
|
|
raise Exception("Tracked device index {} not valid. Not removing.".format(tracked_device_index))
|
|
|
|
def rename_device(self,old_device_name,new_device_name):
|
|
self.devices[new_device_name] = self.devices.pop(old_device_name)
|
|
for i in range(len(self.object_names[self.devices[new_device_name].device_class])):
|
|
if self.object_names[self.devices[new_device_name].device_class][i] == old_device_name:
|
|
self.object_names[self.devices[new_device_name].device_class][i] = new_device_name
|
|
|
|
def print_discovered_objects(self):
|
|
for device_type in self.object_names:
|
|
plural = device_type
|
|
if len(self.object_names[device_type])!=1:
|
|
plural+="s"
|
|
print("Found "+str(len(self.object_names[device_type]))+" "+plural)
|
|
for device in self.object_names[device_type]:
|
|
if device_type == "Tracking Reference":
|
|
print(" "+device+" ("+self.devices[device].get_serial()+
|
|
", Mode "+self.devices[device].get_model()+
|
|
", "+self.devices[device].get_model()+
|
|
")")
|
|
else:
|
|
print(" "+device+" ("+self.devices[device].get_serial()+
|
|
", "+self.devices[device].get_model()+")")
|