62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
from sensor_msgs.msg import JointState
|
|
from osc4py3.as_eventloop import *
|
|
from osc4py3 import oscbuildparse
|
|
import roboticstoolbox as rtb
|
|
|
|
class JointStateOSC(Node):
|
|
def __init__(self):
|
|
super().__init__('joint_states_osc')
|
|
|
|
# Create a ROS 2 subscriber to /joint_states topic
|
|
self.subscription = self.create_subscription(
|
|
JointState,
|
|
'/joint_states',
|
|
self.joint_states_callback,
|
|
1 # Queue size
|
|
)
|
|
|
|
# Open Sound Control (OSC) Client settings
|
|
self.osc_ip = "127.0.0.1" # Replace with the target IP
|
|
self.osc_port = 8000 # Replace with the target port
|
|
|
|
# Start the OSC system
|
|
osc_startup()
|
|
|
|
# Make client channels to send packets
|
|
osc_udp_client(self.osc_ip, self.osc_port, "osc_client")
|
|
|
|
# Load the robot model
|
|
self.robot = rtb.ERobot.URDF('BA/robot.urdf')
|
|
|
|
def joint_states_callback(self, msg):
|
|
"""Callback function to handle incoming joint states."""
|
|
header = msg.header
|
|
joint_names = msg.name
|
|
joint_positions = msg.position
|
|
|
|
# Compute the forward kinematics to get the TCP position
|
|
tcp_pos = self.robot.fkine(joint_positions).t
|
|
|
|
# Prepare the OSC message with the TCP position
|
|
msg = oscbuildparse.OSCMessage("/tcp_position", None, tcp_pos.tolist())
|
|
osc_send(msg, "osc_client")
|
|
osc_process()
|
|
print(f"Published TCP position: {tcp_pos}")
|
|
|
|
def main():
|
|
rclpy.init()
|
|
node = JointStateOSC()
|
|
|
|
try:
|
|
rclpy.spin(node)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
osc_terminate()
|
|
|
|
if __name__ == '__main__':
|
|
main() |