OSC_ROS2/ros_osc/joint_control/joint_angles_server.py
2025-02-06 10:19:51 +01:00

118 lines
4.1 KiB
Python

import rclpy
from rclpy.node import Node
from rclpy.executors import SingleThreadedExecutor
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
import time
class JointStateReader(Node):
"""Node to read joint names from the /joint_states topic."""
def __init__(self):
super().__init__('JointStateReader')
self.sub = self.create_subscription(JointState, 'joint_states', self.sub_callback, 1)
self.joint_names = []
self.got_msg = False
def sub_callback(self, msg):
"""Callback to read joint names from the first received message."""
if not self.joint_names:
self.got_msg = True
self.joint_names = list(msg.name)
def get_joint_names():
"""get joint names using an existing ROS2 context and ensure proper cleanup."""
#rclpy.init()
node = JointStateReader()
executor = SingleThreadedExecutor()
executor.add_node(node)
while not node.got_msg:
executor.spin_once(timeout_sec=0)
joint_list = node.joint_names
node.destroy_node() # destroy the node
#rclpy.shutdown() # Shutdown
return joint_list # Return the joint names
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
10
)
# Use dynamically retrieved joint names
self.joint_names = joint_names
# Store received joint positions
self.joint_positions = [0.0] * len(self.joint_names)
# OSC Server Setup
self.osc_ip = "0.0.0.0"
self.osc_port = 8000
self.dispatcher = Dispatcher()
# Register OSC handlers dynamically
for joint in self.joint_names:
self.dispatcher.map(f"/joint/{joint}", self.osc_joint_handler)
self.get_logger().info("Scaled Joint Trajectory Publisher Ready")
def osc_joint_handler(self, address, *args):
"""Handles incoming OSC messages for joint positions."""
joint_name = address.split("/")[-1] # Extract joint name
if joint_name in self.joint_names:
index = self.joint_names.index(joint_name)
self.joint_positions[index] = args[0] # Update joint position
self.get_logger().info(f"Updated {joint_name} to {args[0]}")
# Publish the trajectory whenever a joint update is received
self.send_trajectory(self.joint_positions)
def send_trajectory(self, joint_positions, duration=2.0):
"""Publish a joint trajectory command to move the UR10e."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
point = JointTrajectoryPoint()
point.positions = joint_positions # Updated joint positions
point.time_from_start.sec = int(duration) # Duration in seconds
#point.time_from_start.nanosec = int((duration - int(duration)) * 1e9) # Nanoseconds
msg.points.append(point)
self.publisher.publish(msg)
self.get_logger().info(f"Published Joint Trajectory: {joint_positions}")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
rclpy.init()
joints = get_joint_names()
#print(joints)
node = ScaledJointTrajectoryPublisher(joints)
# Setup OSC Server
server = BlockingOSCUDPServer((node.osc_ip, node.osc_port), node.dispatcher)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
server.handle_request() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()