118 lines
4.1 KiB
Python
118 lines
4.1 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
from rclpy.executors import SingleThreadedExecutor
|
|
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
|
from sensor_msgs.msg import JointState
|
|
from pythonosc.dispatcher import Dispatcher
|
|
from pythonosc.osc_server import BlockingOSCUDPServer
|
|
import time
|
|
|
|
class JointStateReader(Node):
|
|
"""Node to read joint names from the /joint_states topic."""
|
|
def __init__(self):
|
|
super().__init__('JointStateReader')
|
|
self.sub = self.create_subscription(JointState, 'joint_states', self.sub_callback, 1)
|
|
self.joint_names = []
|
|
self.got_msg = False
|
|
|
|
def sub_callback(self, msg):
|
|
"""Callback to read joint names from the first received message."""
|
|
if not self.joint_names:
|
|
self.got_msg = True
|
|
self.joint_names = list(msg.name)
|
|
|
|
def get_joint_names():
|
|
"""get joint names using an existing ROS2 context and ensure proper cleanup."""
|
|
#rclpy.init()
|
|
node = JointStateReader()
|
|
executor = SingleThreadedExecutor()
|
|
executor.add_node(node)
|
|
|
|
while not node.got_msg:
|
|
executor.spin_once(timeout_sec=0)
|
|
|
|
joint_list = node.joint_names
|
|
|
|
node.destroy_node() # destroy the node
|
|
#rclpy.shutdown() # Shutdown
|
|
|
|
return joint_list # Return the joint names
|
|
|
|
class ScaledJointTrajectoryPublisher(Node):
|
|
"""Node to publish joint trajectories based on OSC messages."""
|
|
def __init__(self, joint_names):
|
|
super().__init__('scaled_joint_trajectory_publisher')
|
|
|
|
# ROS2 Publisher
|
|
self.publisher = self.create_publisher(
|
|
JointTrajectory,
|
|
'/scaled_joint_trajectory_controller/joint_trajectory',
|
|
10
|
|
)
|
|
|
|
# Use dynamically retrieved joint names
|
|
self.joint_names = joint_names
|
|
|
|
# Store received joint positions
|
|
self.joint_positions = [0.0] * len(self.joint_names)
|
|
|
|
# OSC Server Setup
|
|
self.osc_ip = "0.0.0.0"
|
|
self.osc_port = 8000
|
|
self.dispatcher = Dispatcher()
|
|
|
|
# Register OSC handlers dynamically
|
|
for joint in self.joint_names:
|
|
self.dispatcher.map(f"/joint/{joint}", self.osc_joint_handler)
|
|
|
|
self.get_logger().info("Scaled Joint Trajectory Publisher Ready")
|
|
|
|
def osc_joint_handler(self, address, *args):
|
|
"""Handles incoming OSC messages for joint positions."""
|
|
joint_name = address.split("/")[-1] # Extract joint name
|
|
if joint_name in self.joint_names:
|
|
index = self.joint_names.index(joint_name)
|
|
self.joint_positions[index] = args[0] # Update joint position
|
|
self.get_logger().info(f"Updated {joint_name} to {args[0]}")
|
|
|
|
# Publish the trajectory whenever a joint update is received
|
|
self.send_trajectory(self.joint_positions)
|
|
|
|
def send_trajectory(self, joint_positions, duration=2.0):
|
|
"""Publish a joint trajectory command to move the UR10e."""
|
|
msg = JointTrajectory()
|
|
msg.joint_names = self.joint_names
|
|
|
|
point = JointTrajectoryPoint()
|
|
point.positions = joint_positions # Updated joint positions
|
|
point.time_from_start.sec = int(duration) # Duration in seconds
|
|
#point.time_from_start.nanosec = int((duration - int(duration)) * 1e9) # Nanoseconds
|
|
|
|
msg.points.append(point)
|
|
self.publisher.publish(msg)
|
|
self.get_logger().info(f"Published Joint Trajectory: {joint_positions}")
|
|
|
|
def main():
|
|
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
|
rclpy.init()
|
|
joints = get_joint_names()
|
|
#print(joints)
|
|
node = ScaledJointTrajectoryPublisher(joints)
|
|
|
|
# Setup OSC Server
|
|
server = BlockingOSCUDPServer((node.osc_ip, node.osc_port), node.dispatcher)
|
|
|
|
# Run both ROS 2 and OSC Server together
|
|
try:
|
|
while rclpy.ok():
|
|
server.handle_request() # Handle one OSC request at a time
|
|
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
|
|
except KeyboardInterrupt:
|
|
print("")
|
|
finally:
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|