OSC_ROS2/ros_osc/joint_info/osc4py3/tcp_cart_pos.py
2025-03-14 17:52:15 +01:00

62 lines
1.8 KiB
Python

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from osc4py3.as_eventloop import *
from osc4py3 import oscbuildparse
import roboticstoolbox as rtb
class JointStateOSC(Node):
def __init__(self):
super().__init__('joint_states_osc')
# Create a ROS 2 subscriber to /joint_states topic
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Queue size
)
# Open Sound Control (OSC) Client settings
self.osc_ip = "127.0.0.1" # Replace with the target IP
self.osc_port = 8000 # Replace with the target port
# Start the OSC system
osc_startup()
# Make client channels to send packets
osc_udp_client(self.osc_ip, self.osc_port, "osc_client")
# Load the robot model
self.robot = rtb.ERobot.URDF('BA/robot.urdf')
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
header = msg.header
joint_names = msg.name
joint_positions = msg.position
# Compute the forward kinematics to get the TCP position
tcp_pos = self.robot.fkine(joint_positions).t
# Prepare the OSC message with the TCP position
msg = oscbuildparse.OSCMessage("/tcp_position", None, tcp_pos.tolist())
osc_send(msg, "osc_client")
osc_process()
print(f"Published TCP position: {tcp_pos}")
def main():
rclpy.init()
node = JointStateOSC()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()