import time import sys import openvr import math import json from functools import lru_cache # Function to print out text but instead of starting a new line it will overwrite the existing line def update_text(txt): sys.stdout.write('\r'+txt) sys.stdout.flush() #Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Euler angles (in degrees) def convert_to_euler(pose_mat): yaw = 180 / math.pi * math.atan2(pose_mat[1][0], pose_mat[0][0]) pitch = 180 / math.pi * math.atan2(pose_mat[2][0], pose_mat[0][0]) roll = 180 / math.pi * math.atan2(pose_mat[2][1], pose_mat[2][2]) x = pose_mat[0][3] y = pose_mat[1][3] z = pose_mat[2][3] return [x,y,z,yaw,pitch,roll] #Convert the standard 3x4 position/rotation matrix to a x,y,z location and the appropriate Quaternion def convert_to_quaternion(pose_mat): # Per issue #2, adding a abs() so that sqrt only results in real numbers r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2 r_x = (pose_mat[2][1]-pose_mat[1][2])/(4*r_w) r_y = (pose_mat[0][2]-pose_mat[2][0])/(4*r_w) r_z = (pose_mat[1][0]-pose_mat[0][1])/(4*r_w) x = pose_mat[0][3] y = pose_mat[1][3] z = pose_mat[2][3] return [x,y,z,r_w,r_x,r_y,r_z] #Define a class to make it easy to append pose matricies and convert to both Euler and Quaternion for plotting class pose_sample_buffer(): def __init__(self): self.i = 0 self.index = [] self.time = [] self.x = [] self.y = [] self.z = [] self.yaw = [] self.pitch = [] self.roll = [] self.r_w = [] self.r_x = [] self.r_y = [] self.r_z = [] def append(self,pose_mat,t): self.time.append(t) self.x.append(pose_mat[0][3]) self.y.append(pose_mat[1][3]) self.z.append(pose_mat[2][3]) self.yaw.append(180 / math.pi * math.atan(pose_mat[1][0] /pose_mat[0][0])) self.pitch.append(180 / math.pi * math.atan(-1 * pose_mat[2][0] / math.sqrt(pow(pose_mat[2][1], 2) + math.pow(pose_mat[2][2], 2)))) self.roll.append(180 / math.pi * math.atan(pose_mat[2][1] /pose_mat[2][2])) r_w = math.sqrt(abs(1+pose_mat[0][0]+pose_mat[1][1]+pose_mat[2][2]))/2 self.r_w.append(r_w) self.r_x.append((pose_mat[2][1]-pose_mat[1][2])/(4*r_w)) self.r_y.append((pose_mat[0][2]-pose_mat[2][0])/(4*r_w)) self.r_z.append((pose_mat[1][0]-pose_mat[0][1])/(4*r_w)) def get_pose(vr_obj): return vr_obj.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0, openvr.k_unMaxTrackedDeviceCount) class vr_tracked_device(): def __init__(self,vr_obj,index,device_class): self.device_class = device_class self.index = index self.vr = vr_obj @lru_cache(maxsize=None) def get_serial(self): return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_SerialNumber_String) def get_model(self): return self.vr.getStringTrackedDeviceProperty(self.index, openvr.Prop_ModelNumber_String) def get_battery_percent(self): return self.vr.getFloatTrackedDeviceProperty(self.index, openvr.Prop_DeviceBatteryPercentage_Float) def is_charging(self): return self.vr.getBoolTrackedDeviceProperty(self.index, openvr.Prop_DeviceIsCharging_Bool) def sample(self,num_samples,sample_rate): interval = 1/sample_rate rtn = pose_sample_buffer() sample_start = time.time() for i in range(num_samples): start = time.time() pose = get_pose(self.vr) rtn.append(pose[self.index].mDeviceToAbsoluteTracking,time.time()-sample_start) sleep_time = interval- (time.time()-start) if sleep_time>0: time.sleep(sleep_time) return rtn def get_pose_euler(self, pose=None): if pose == None: pose = get_pose(self.vr) if pose[self.index].bPoseIsValid: return convert_to_euler(pose[self.index].mDeviceToAbsoluteTracking) else: return None def get_pose_matrix(self, pose=None): if pose == None: pose = get_pose(self.vr) if pose[self.index].bPoseIsValid: return pose[self.index].mDeviceToAbsoluteTracking else: return None def get_velocity(self, pose=None): if pose == None: pose = get_pose(self.vr) if pose[self.index].bPoseIsValid: return pose[self.index].vVelocity else: return None def get_angular_velocity(self, pose=None): if pose == None: pose = get_pose(self.vr) if pose[self.index].bPoseIsValid: return pose[self.index].vAngularVelocity else: return None def get_pose_quaternion(self, pose=None): if pose == None: pose = get_pose(self.vr) if pose[self.index].bPoseIsValid: return convert_to_quaternion(pose[self.index].mDeviceToAbsoluteTracking) else: return None def controller_state_to_dict(self, pControllerState): # This function is graciously borrowed from https://gist.github.com/awesomebytes/75daab3adb62b331f21ecf3a03b3ab46 # docs: https://github.com/ValveSoftware/openvr/wiki/IVRSystem::GetControllerState d = {} d['unPacketNum'] = pControllerState.unPacketNum # on trigger .y is always 0.0 says the docs d['trigger'] = pControllerState.rAxis[1].x # 0.0 on trigger is fully released # -1.0 to 1.0 on joystick and trackpads d['trackpad_x'] = pControllerState.rAxis[0].x d['trackpad_y'] = pControllerState.rAxis[0].y # These are published and always 0.0 # for i in range(2, 5): # d['unknowns_' + str(i) + '_x'] = pControllerState.rAxis[i].x # d['unknowns_' + str(i) + '_y'] = pControllerState.rAxis[i].y d['ulButtonPressed'] = pControllerState.ulButtonPressed d['ulButtonTouched'] = pControllerState.ulButtonTouched # To make easier to understand what is going on # Second bit marks menu button d['menu_button'] = bool(pControllerState.ulButtonPressed >> 1 & 1) # 32 bit marks trackpad d['trackpad_pressed'] = bool(pControllerState.ulButtonPressed >> 32 & 1) d['trackpad_touched'] = bool(pControllerState.ulButtonTouched >> 32 & 1) # third bit marks grip button d['grip_button'] = bool(pControllerState.ulButtonPressed >> 2 & 1) # System button can't be read, if you press it # the controllers stop reporting return d def get_controller_inputs(self): result, state = self.vr.getControllerState(self.index) return self.controller_state_to_dict(state) def trigger_haptic_pulse(self, duration_micros=1000, axis_id=0): """ Causes devices with haptic feedback to vibrate for a short time. """ self.vr.triggerHapticPulse(self.index ,axis_id, duration_micros) class vr_tracking_reference(vr_tracked_device): def get_mode(self): return self.vr.getStringTrackedDeviceProperty(self.index,openvr.Prop_ModeLabel_String).decode('utf-8').upper() def sample(self,num_samples,sample_rate): print("Warning: Tracking References do not move, sample isn't much use...") class triad_openvr(): def __init__(self, configfile_path=None): # Initialize OpenVR in the self.vr = openvr.init(openvr.VRApplication_Other) self.vrsystem = openvr.VRSystem() # Initializing object to hold indexes for various tracked objects self.object_names = {"Tracking Reference":[],"HMD":[],"Controller":[],"Tracker":[]} self.devices = {} self.device_index_map = {} poses = self.vr.getDeviceToAbsoluteTrackingPose(openvr.TrackingUniverseStanding, 0, openvr.k_unMaxTrackedDeviceCount) # Loading config file if configfile_path: try: with open(configfile_path, 'r') as json_data: config = json.load(json_data) except EnvironmentError: # parent of IOError, OSError *and* WindowsError where available print('config.json not found.') exit(1) # Iterate through the pose list to find the active devices and determine their type for i in range(openvr.k_unMaxTrackedDeviceCount): if poses[i].bDeviceIsConnected: device_serial = self.vr.getStringTrackedDeviceProperty(i,openvr.Prop_SerialNumber_String).decode('utf-8') for device in config['devices']: if device_serial == device['serial']: device_name = device['name'] self.object_names[device['type']].append(device_name) self.devices[device_name] = vr_tracked_device(self.vr,i,device['type']) else: # Iterate through the pose list to find the active devices and determine their type for i in range(openvr.k_unMaxTrackedDeviceCount): if poses[i].bDeviceIsConnected: self.add_tracked_device(i) def __del__(self): openvr.shutdown() def get_pose(self): return get_pose(self.vr) def poll_vr_events(self): """ Used to poll VR events and find any new tracked devices or ones that are no longer tracked. """ event = openvr.VREvent_t() while self.vrsystem.pollNextEvent(event): if event.eventType == openvr.VREvent_TrackedDeviceActivated: self.add_tracked_device(event.trackedDeviceIndex) elif event.eventType == openvr.VREvent_TrackedDeviceDeactivated: #If we were already tracking this device, quit tracking it. if event.trackedDeviceIndex in self.device_index_map: self.remove_tracked_device(event.trackedDeviceIndex) def add_tracked_device(self, tracked_device_index): i = tracked_device_index device_class = self.vr.getTrackedDeviceClass(i) if (device_class == openvr.TrackedDeviceClass_Controller): device_name = "controller_"+str(len(self.object_names["Controller"])+1) self.object_names["Controller"].append(device_name) self.devices[device_name] = vr_tracked_device(self.vr,i,"Controller") self.device_index_map[i] = device_name elif (device_class == openvr.TrackedDeviceClass_HMD): device_name = "hmd_"+str(len(self.object_names["HMD"])+1) self.object_names["HMD"].append(device_name) self.devices[device_name] = vr_tracked_device(self.vr,i,"HMD") self.device_index_map[i] = device_name elif (device_class == openvr.TrackedDeviceClass_GenericTracker): device_name = "tracker_"+str(len(self.object_names["Tracker"])+1) self.object_names["Tracker"].append(device_name) self.devices[device_name] = vr_tracked_device(self.vr,i,"Tracker") self.device_index_map[i] = device_name elif (device_class == openvr.TrackedDeviceClass_TrackingReference): device_name = "tracking_reference_"+str(len(self.object_names["Tracking Reference"])+1) self.object_names["Tracking Reference"].append(device_name) self.devices[device_name] = vr_tracking_reference(self.vr,i,"Tracking Reference") self.device_index_map[i] = device_name def remove_tracked_device(self, tracked_device_index): if tracked_device_index in self.device_index_map: device_name = self.device_index_map[tracked_device_index] self.object_names[self.devices[device_name].device_class].remove(device_name) del self.device_index_map[tracked_device_index] del self.devices[device_name] else: raise Exception("Tracked device index {} not valid. Not removing.".format(tracked_device_index)) def rename_device(self,old_device_name,new_device_name): self.devices[new_device_name] = self.devices.pop(old_device_name) for i in range(len(self.object_names[self.devices[new_device_name].device_class])): if self.object_names[self.devices[new_device_name].device_class][i] == old_device_name: self.object_names[self.devices[new_device_name].device_class][i] = new_device_name def print_discovered_objects(self): for device_type in self.object_names: plural = device_type if len(self.object_names[device_type])!=1: plural+="s" print("Found "+str(len(self.object_names[device_type]))+" "+plural) for device in self.object_names[device_type]: if device_type == "Tracking Reference": print(" "+device+" ("+self.devices[device].get_serial()+ ", Mode "+self.devices[device].get_model()+ ", "+self.devices[device].get_model()+ ")") else: print(" "+device+" ("+self.devices[device].get_serial()+ ", "+self.devices[device].get_model()+")")