AS: creating final node
This commit is contained in:
Binary file not shown.
9
test/ros_osc/joint_control/joint_angles_client.py
Normal file
9
test/ros_osc/joint_control/joint_angles_client.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from pythonosc.udp_client import SimpleUDPClient
|
||||
|
||||
ip = "127.0.0.1"
|
||||
port = 8000
|
||||
|
||||
client = SimpleUDPClient(ip, port)
|
||||
|
||||
# Send joint updates
|
||||
client.send_message("/joint_angles", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
|
||||
117
test/ros_osc/joint_control/joint_angles_server.py
Normal file
117
test/ros_osc/joint_control/joint_angles_server.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.executors import SingleThreadedExecutor
|
||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||
from sensor_msgs.msg import JointState
|
||||
from pythonosc.dispatcher import Dispatcher
|
||||
from pythonosc.osc_server import BlockingOSCUDPServer
|
||||
import time
|
||||
|
||||
class JointStateReader(Node):
|
||||
"""Node to read joint names from the /joint_states topic."""
|
||||
def __init__(self):
|
||||
super().__init__('JointStateReader')
|
||||
self.sub = self.create_subscription(JointState, 'joint_states', self.sub_callback, 1)
|
||||
self.joint_names = []
|
||||
self.got_msg = False
|
||||
|
||||
def sub_callback(self, msg):
|
||||
"""Callback to read joint names from the first received message."""
|
||||
if not self.joint_names:
|
||||
self.got_msg = True
|
||||
self.joint_names = list(msg.name)
|
||||
|
||||
def get_joint_names():
|
||||
"""get joint names using an existing ROS2 context and ensure proper cleanup."""
|
||||
#rclpy.init()
|
||||
node = JointStateReader()
|
||||
executor = SingleThreadedExecutor()
|
||||
executor.add_node(node)
|
||||
|
||||
while not node.got_msg:
|
||||
executor.spin_once(timeout_sec=0)
|
||||
|
||||
joint_list = node.joint_names
|
||||
|
||||
node.destroy_node() # destroy the node
|
||||
#rclpy.shutdown() # Shutdown
|
||||
|
||||
return joint_list # Return the joint names
|
||||
|
||||
class ScaledJointTrajectoryPublisher(Node):
|
||||
"""Node to publish joint trajectories based on OSC messages."""
|
||||
def __init__(self, joint_names):
|
||||
super().__init__('scaled_joint_trajectory_publisher')
|
||||
|
||||
# ROS2 Publisher
|
||||
self.publisher = self.create_publisher(
|
||||
JointTrajectory,
|
||||
'/scaled_joint_trajectory_controller/joint_trajectory',
|
||||
10
|
||||
)
|
||||
|
||||
# Use dynamically retrieved joint names
|
||||
self.joint_names = joint_names
|
||||
|
||||
# Store received joint positions
|
||||
self.joint_positions = [0.0] * len(self.joint_names)
|
||||
|
||||
# OSC Server Setup
|
||||
self.osc_ip = "0.0.0.0"
|
||||
self.osc_port = 8000
|
||||
self.dispatcher = Dispatcher()
|
||||
|
||||
# Register OSC handlers dynamically
|
||||
for joint in self.joint_names:
|
||||
self.dispatcher.map(f"/joint/{joint}", self.osc_joint_handler)
|
||||
|
||||
self.get_logger().info("Scaled Joint Trajectory Publisher Ready")
|
||||
|
||||
def osc_joint_handler(self, address, *args):
|
||||
"""Handles incoming OSC messages for joint positions."""
|
||||
joint_name = address.split("/")[-1] # Extract joint name
|
||||
if joint_name in self.joint_names:
|
||||
index = self.joint_names.index(joint_name)
|
||||
self.joint_positions[index] = args[0] # Update joint position
|
||||
self.get_logger().info(f"Updated {joint_name} to {args[0]}")
|
||||
|
||||
# Publish the trajectory whenever a joint update is received
|
||||
self.send_trajectory(self.joint_positions)
|
||||
|
||||
def send_trajectory(self, joint_positions, duration=2.0):
|
||||
"""Publish a joint trajectory command to move the UR10e."""
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = joint_positions # Updated joint positions
|
||||
point.time_from_start.sec = int(duration) # Duration in seconds
|
||||
#point.time_from_start.nanosec = int((duration - int(duration)) * 1e9) # Nanoseconds
|
||||
|
||||
msg.points.append(point)
|
||||
self.publisher.publish(msg)
|
||||
self.get_logger().info(f"Published Joint Trajectory: {joint_positions}")
|
||||
|
||||
def main():
|
||||
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
||||
rclpy.init()
|
||||
joints = get_joint_names()
|
||||
#print(joints)
|
||||
node = ScaledJointTrajectoryPublisher(joints)
|
||||
|
||||
# Setup OSC Server
|
||||
server = BlockingOSCUDPServer((node.osc_ip, node.osc_port), node.dispatcher)
|
||||
|
||||
# Run both ROS 2 and OSC Server together
|
||||
try:
|
||||
while rclpy.ok():
|
||||
server.handle_request() # Handle one OSC request at a time
|
||||
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user