import rclpy from rclpy.node import Node from sensor_msgs.msg import JointState from osc4py3.as_eventloop import * from osc4py3 import oscbuildparse import roboticstoolbox as rtb class JointStateOSC(Node): def __init__(self): super().__init__('joint_states_osc') # Create a ROS 2 subscriber to /joint_states topic self.subscription = self.create_subscription( JointState, '/joint_states', self.joint_states_callback, 1 # Queue size ) # Open Sound Control (OSC) Client settings self.osc_ip = "127.0.0.1" # Replace with the target IP self.osc_port = 8000 # Replace with the target port # Start the OSC system osc_startup() # Make client channels to send packets osc_udp_client(self.osc_ip, self.osc_port, "osc_client") # Load the robot model self.robot = rtb.ERobot.URDF('BA/robot.urdf') def joint_states_callback(self, msg): """Callback function to handle incoming joint states.""" header = msg.header joint_names = msg.name joint_positions = msg.position # Compute the forward kinematics to get the TCP position tcp_pos = self.robot.fkine(joint_positions).t # Prepare the OSC message with the TCP position msg = oscbuildparse.OSCMessage("/tcp_position", None, tcp_pos.tolist()) osc_send(msg, "osc_client") osc_process() print(f"Published TCP position: {tcp_pos}") def main(): rclpy.init() node = JointStateOSC() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() osc_terminate() if __name__ == '__main__': main()