75 lines
2.3 KiB
Python
75 lines
2.3 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
from sensor_msgs.msg import JointState
|
|
from pythonosc.udp_client import SimpleUDPClient
|
|
|
|
class JointStateOSC(Node):
|
|
def __init__(self):
|
|
super().__init__('joint_states_osc')
|
|
|
|
# Create a ROS 2 subscriber to /joint_states topic
|
|
self.subscription = self.create_subscription(
|
|
JointState,
|
|
'/joint_states',
|
|
self.joint_states_callback,
|
|
1 # Queue size
|
|
)
|
|
|
|
# Open Sound Control (OSC) Client settings
|
|
self.osc_ip = "127.0.0.1" # Replace with the target IP
|
|
self.osc_port = 8000 # Replace with the target port
|
|
self.osc_client = SimpleUDPClient(self.osc_ip, self.osc_port)
|
|
|
|
def joint_states_callback(self, msg):
|
|
"""Callback function to handle incoming joint states."""
|
|
header = msg.header
|
|
joint_names = msg.name
|
|
joint_positions = msg.position
|
|
joint_velocity = msg.velocity
|
|
joint_effort = msg.effort
|
|
|
|
joint_names_str = "\n- ".join(joint_names)
|
|
joint_positions_str = "\n- ".join(map(str, joint_positions))
|
|
joint_velocity_str = "\n- ".join(map(str, joint_velocity))
|
|
joint_effort_str = "\n- ".join(map(str, joint_effort))
|
|
|
|
info = f"""
|
|
---
|
|
header:
|
|
stamp:
|
|
sec: {header.stamp.sec}
|
|
nanosec: {header.stamp.nanosec}
|
|
name:
|
|
- {joint_names_str}
|
|
position:
|
|
- {joint_positions_str}
|
|
velocity:
|
|
- {joint_velocity_str}
|
|
effort:
|
|
- {joint_effort_str}
|
|
---"""
|
|
|
|
self.osc_client.send_message(f"/joint_states", info)
|
|
|
|
# Send each joint state as an OSC message
|
|
for i, name in enumerate(joint_names):
|
|
|
|
self.osc_client.send_message(f"/joint_states/header/sec", header.stamp.sec) # Publish Time (sec)
|
|
self.osc_client.send_message(f"/joint_states/header/nanosec", header.stamp.nanosec) # Publish Time (nanosec)
|
|
|
|
self.osc_client.send_message(f"/joint_states/{name}/position", joint_positions[i]) # Publish position
|
|
|
|
self.osc_client.send_message(f"/joint_states/{name}/velocity", joint_velocity[i]) # Publish velocity
|
|
|
|
self.osc_client.send_message(f"/joint_states/{name}/effort", joint_effort[i]) # Publish effort
|
|
|
|
def main():
|
|
rclpy.init()
|
|
node = JointStateOSC()
|
|
rclpy.spin(node)
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|