diff --git a/.DS_Store b/.DS_Store index 402510b..8aa99a7 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/osc4py3_test/pub.py b/osc4py3_test/pub.py new file mode 100644 index 0000000..143a766 --- /dev/null +++ b/osc4py3_test/pub.py @@ -0,0 +1,26 @@ +from osc4py3.as_eventloop import * +from osc4py3 import oscbuildparse +import time + +# Start OSC +osc_startup() +osc_udp_client("127.0.0.1", 8000, "osc_client") + +# Generate a future timetag (current time + 2 seconds) +timetag = oscbuildparse.unixtime2timetag(time.time() + 2) + +# Create messages with proper addresses +msg1 = oscbuildparse.OSCMessage("/joint_states/joint1/position", None, [42]) +msg2 = oscbuildparse.OSCMessage("/joint_states/joint2/velocity", None, [3.14]) + +# Create an OSC bundle with the correct timetag +bundle = oscbuildparse.OSCBundle(timetag, [msg1, msg2]) + +# Send the bundle +osc_send(bundle, "osc_client") +osc_process() + +print(f"📤 Sent OSC bundle with timetag: {timetag}") + +# Terminate OSC +osc_terminate() diff --git a/osc4py3_test/sub.py b/osc4py3_test/sub.py new file mode 100644 index 0000000..e96693e --- /dev/null +++ b/osc4py3_test/sub.py @@ -0,0 +1,37 @@ +from osc4py3.as_eventloop import * +from osc4py3 import oscbuildparse + +# Define an OSC bundle handler +def bundle_handler(bundle): + """Handles received OSC bundles, extracts the timetag and messages.""" + print(f"📩 Received an OSC Bundle!") + + # Extract timetag from the bundle metadata + timetag = bundle.timetag + unix_time = oscbuildparse.timetag2unixtime(timetag) + print(f"⏳ Timetag (NTP format): {timetag}") + print(f"⏳ Converted Unix Time: {unix_time}") + + # Process all elements inside the bundle + for element in bundle.elements: + if isinstance(element, oscbuildparse.OSCMessage): + print(f"📨 Message received at {element.address} with args {element.params}") + +# Start OSC +osc_startup() + +# Create an OSC UDP server to receive bundles +osc_udp_server("127.0.0.1", 8000, "osc_server") + +# Register the handler for OSC Bundles (Without `oscdispatch`) +osc_method('/*', bundle_handler, argscheme="B") # 'B' for bundles + +print("🚀 Listening for OSC bundles on 127.0.0.1:8000...\nPress Ctrl+C to exit.") + +# Keep processing incoming OSC messages +try: + while True: + osc_process() +except KeyboardInterrupt: + print("\nShutting down OSC receiver...") + osc_terminate() diff --git a/osc4py3_test/test.py b/osc4py3_test/test.py new file mode 100644 index 0000000..c56aef7 --- /dev/null +++ b/osc4py3_test/test.py @@ -0,0 +1,31 @@ +from osc4py3.as_eventloop import * +#from osc4py3 import oscdispatch +import time +from osc4py3 import oscmethod as osm + +# Define the OSC message handler +def joint_states_handler(address): + time.sleep(1) # Simulate processing time + print(f"Received OSC message at {address}") + +# Start the OSC system +osc_startup() + +# Create an OSC UDP server +osc_udp_server("127.0.0.1", 8000, "osc_server") + +# Register handlers for different joint state messages +osc_method("/joint_states", joint_states_handler, argscheme=osm.OSCARG_ADDRESS) +osc_method("/joint_states/*/position", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) +osc_method("/joint_states/*/velocity", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) +osc_method("/joint_states/*/effort", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) + +print("OSC Receiver is listening on 127.0.0.1:8000...\nPress Ctrl+C to exit.") + +# Keep the server running +try: + while True: + osc_process() +except KeyboardInterrupt: + print("\nShutting down OSC receiver...") + osc_terminate() diff --git a/robot.urdf b/robot.urdf new file mode 100644 index 0000000..f431eeb --- /dev/null +++ b/robot.urdf @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ros2_ws/src/joint_states_control/__pycache__/cart_to_angles.cpython-310.pyc b/ros2_ws/src/joint_states_control/__pycache__/cart_to_angles.cpython-310.pyc index beafd63..074434b 100644 Binary files a/ros2_ws/src/joint_states_control/__pycache__/cart_to_angles.cpython-310.pyc and b/ros2_ws/src/joint_states_control/__pycache__/cart_to_angles.cpython-310.pyc differ diff --git a/ros2_ws/src/joint_states_control/cart_to_angles.py b/ros2_ws/src/joint_states_control/cart_to_angles.py index a67ee55..057a993 100644 --- a/ros2_ws/src/joint_states_control/cart_to_angles.py +++ b/ros2_ws/src/joint_states_control/cart_to_angles.py @@ -4,8 +4,15 @@ import roboticstoolbox as rtb def ik_sol(robot, pose): ets = robot.ets() - Tep = ets.fkine(pose) + #Tep = ets.fkine(pose) + + #sol = ets.ik_GN(Tep) + + solver = rtb.IK_GN() + Tep = ets.fkine(pose) + sol = solver.solve(ets, Tep).q.tolist() + + return sol - sol = ets.ik_LM(Tep) if sol[1]: return sol[0].tolist() return "no sol" diff --git a/ros2_ws/src/joint_states_control/get_urdf.py b/ros2_ws/src/joint_states_control/get_urdf.py new file mode 100644 index 0000000..cb6887a --- /dev/null +++ b/ros2_ws/src/joint_states_control/get_urdf.py @@ -0,0 +1,55 @@ +import rclpy +from rclpy.node import Node +from rcl_interfaces.srv import GetParameters +import roboticstoolbox as rtb + +class URDFRetriever(Node): + def __init__(self): + super().__init__('urdf_retriever') + + # Create a parameter client to request the parameter from /robot_state_publisher + self.client = self.create_client(GetParameters, '/robot_state_publisher/get_parameters') + + while not self.client.wait_for_service(timeout_sec=3.0): + self.get_logger().warn("⏳ Waiting for /robot_state_publisher parameter service...") + + # Create and send a request to get 'robot_description' + self.request = GetParameters.Request() + self.request.names = ["robot_description"] + + self.future = self.client.call_async(self.request) + rclpy.spin_until_future_complete(self, self.future) + + # Extract the URDF + if self.future.result(): + self.urdf_string = self.future.result().values[0].string_value + if self.urdf_string: + self.get_logger().info("Successfully retrieved URDF from /robot_state_publisher!") + else: + self.get_logger().error("URDF is empty") + else: + self.get_logger().error("Failed to get URDF from /robot_state_publisher.") + +def main(): + rclpy.init() + node = URDFRetriever() + + urdf_string = node.urdf_string + + if urdf_string: + # Save to a file + urdf_path = "/tmp/robot.urdf" + with open(urdf_path, "w") as file: + file.write(urdf_string) + + print(f"✅ URDF saved to {urdf_path}") + + # Load into Robotic Toolbox + robot = rtb.ERobot.URDF(urdf_path) + print(robot) + + node.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main() diff --git a/ros2_ws/src/joint_states_control/move_scaled_joint_cart copy.py b/ros2_ws/src/joint_states_control/move_scaled_joint_cart copy.py new file mode 100644 index 0000000..ed9951b --- /dev/null +++ b/ros2_ws/src/joint_states_control/move_scaled_joint_cart copy.py @@ -0,0 +1,95 @@ +import rclpy +from rclpy.node import Node +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint +from rcl_interfaces.srv import GetParameters +import time +from cart_to_angles import ik_sol +import roboticstoolbox as rtb +import xacro +from spatialmath import SE3 + +class ScaledJointTrajectoryPublisher(Node): + def __init__(self): + super().__init__('scaled_joint_trajectory_publisher') + + # Create a parameter client to request the parameter from /robot_state_publisher + self.client = self.create_client(GetParameters, '/robot_state_publisher/get_parameters') + + while not self.client.wait_for_service(timeout_sec=3.0): + self.get_logger().warn("⏳ Waiting for /robot_state_publisher parameter service...") + + # Create and send a request to get 'robot_description' + self.request = GetParameters.Request() + self.request.names = ["robot_description"] + + self.future = self.client.call_async(self.request) + rclpy.spin_until_future_complete(self, self.future) + + # Extract the URDF + if self.future.result(): + self.urdf_string = self.future.result().values[0].string_value + if self.urdf_string: + self.get_logger().info("Successfully retrieved URDF from /robot_state_publisher!") + else: + self.get_logger().error("URDF is empty") + else: + self.get_logger().error("Failed to get URDF from /robot_state_publisher.") + + + self.publisher = self.create_publisher( + JointTrajectory, + '/scaled_joint_trajectory_controller/joint_trajectory', + 10 + ) + time.sleep(1) # Allow publisher to initialize + self.get_logger().info("Scaled Joint Trajectory Publisher Ready") + + def send_trajectory(self, joint_positions, duration=5.0): + """Publish a joint trajectory command to move the UR10e.""" + msg = JointTrajectory() + msg.joint_names = [ + "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", + "wrist_1_joint", "wrist_2_joint", "wrist_3_joint" + ] + + point = JointTrajectoryPoint() + point.positions = joint_positions # Target joint positions + point.time_from_start.sec = int(duration) # Duration in seconds + + msg.points.append(point) + + self.publisher.publish(msg) + + +def main(): + rclpy.init() + node = ScaledJointTrajectoryPublisher() + + urdf_string = node.urdf_string + + # Save to a file + urdf_path = "/tmp/robot.urdf" + with open(urdf_path, "w") as file: + file.write(urdf_string) + + print(f"URDF saved to {urdf_path}") + + #robot = rtb.ERobot.URDF(urdf_path) # Load into Robotic Toolbox + robot = rtb.ERobot.URDF('/BA/robot.urdf') # Load into Robotic Toolbox + + Tep = SE3.Trans(0.3, 0.3, 0.3) * SE3.OA([0, 1, 0], [0, 0, 1]) + sol = robot.ik_LM(Tep) + sol = sol[0].tolist() + + print("Angles:") + print(f"{[val * (180/3.141) for val in sol]} (deg)") + print(f"{sol} (rad)") + # Move to a target position + node.send_trajectory(sol) + + rclpy.spin(node) + node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() diff --git a/ros2_ws/src/joint_states_control/move_scaled_joint_cart.py b/ros2_ws/src/joint_states_control/move_scaled_joint_cart.py index 55d8f39..fac0a81 100644 --- a/ros2_ws/src/joint_states_control/move_scaled_joint_cart.py +++ b/ros2_ws/src/joint_states_control/move_scaled_joint_cart.py @@ -1,13 +1,40 @@ import rclpy from rclpy.node import Node from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint +from rcl_interfaces.srv import GetParameters import time from cart_to_angles import ik_sol import roboticstoolbox as rtb +import xacro class ScaledJointTrajectoryPublisher(Node): def __init__(self): super().__init__('scaled_joint_trajectory_publisher') + + # Create a parameter client to request the parameter from /robot_state_publisher + self.client = self.create_client(GetParameters, '/robot_state_publisher/get_parameters') + + while not self.client.wait_for_service(timeout_sec=3.0): + self.get_logger().warn("⏳ Waiting for /robot_state_publisher parameter service...") + + # Create and send a request to get 'robot_description' + self.request = GetParameters.Request() + self.request.names = ["robot_description"] + + self.future = self.client.call_async(self.request) + rclpy.spin_until_future_complete(self, self.future) + + # Extract the URDF + if self.future.result(): + self.urdf_string = self.future.result().values[0].string_value + if self.urdf_string: + self.get_logger().info("Successfully retrieved URDF from /robot_state_publisher!") + else: + self.get_logger().error("URDF is empty") + else: + self.get_logger().error("Failed to get URDF from /robot_state_publisher.") + + self.publisher = self.create_publisher( JointTrajectory, '/scaled_joint_trajectory_controller/joint_trajectory', @@ -32,15 +59,29 @@ class ScaledJointTrajectoryPublisher(Node): self.publisher.publish(msg) + def main(): rclpy.init() node = ScaledJointTrajectoryPublisher() - robot = rtb.models.URDF.UR10() - target_position = [0, 0, 300, 0, 0, 0] + urdf_string = node.urdf_string + # Save to a file + urdf_path = "/tmp/robot.urdf" + with open(urdf_path, "w") as file: + file.write(urdf_string) + + print(f"URDF saved to {urdf_path}") + + robot = rtb.ERobot.URDF(urdf_path) # Load into Robotic Toolbox + + target_position = [0.4, 0.4, 0.4, 0, 0, 0] + sol = ik_sol(robot,target_position) + print("Winkel:") + print([val * (180/3.141) for val in sol]) + print(sol) # Move to a target position - node.send_trajectory(ik_sol(robot,target_position)) + node.send_trajectory(sol) rclpy.spin(node) node.destroy_node() diff --git a/ros2_ws/src/joint_states_control/test.py b/ros2_ws/src/joint_states_control/test.py index dedf8db..6cd8f7a 100644 --- a/ros2_ws/src/joint_states_control/test.py +++ b/ros2_ws/src/joint_states_control/test.py @@ -1,8 +1,9 @@ -from cart_to_angles import ik_sol +import os import roboticstoolbox as rtb -robot = rtb.models.URDF.UR10() +# Get the URDF from ROS 2 +os.system("ros2 param get /robot_state_publisher robot_description > urdf.xml") -print(robot) - -print(ik_sol(robot, [0, 0, 200, 2,3, 1])) \ No newline at end of file +# Load the URDF using roboticstoolbox +robot = rtb.ERobot.URDF("urdf.xml") +print("Robot loaded:", robot) diff --git a/ros2_ws/src/joint_states_control/test_urdf.py b/ros2_ws/src/joint_states_control/test_urdf.py new file mode 100644 index 0000000..db2c581 --- /dev/null +++ b/ros2_ws/src/joint_states_control/test_urdf.py @@ -0,0 +1,48 @@ +import rclpy +from rclpy.node import Node +import roboticstoolbox as rtb + +class GET_URDF(Node): + def __init__(self): + super().__init__('GET_URDF') + + # Declare parameter and get it + self.urdf_value = self.get_parameter('robot_description') + + + if not self.urdf_value: + self.get_logger().error("URDF is empty! Check if robot_state_publisher is correctly publishing it.") + self.urdf_string = None + else: + #self.urdf_string = urdf_value + self.get_logger().info("Successfully got URDF from ROS 2.") + + def get_urdf(self): + return self.urdf_value + +def main(): + rclpy.init() + node = GET_URDF() + + print(node.get_urdf()) + """ + if urdf_string: + # Print the first 500 characters to check + print(f"URDF (first 500 chars):\n{urdf_string[:500]}") + + # Save to file and use in Robotic Toolbox + urdf_path = "/tmp/robot.urdf" + with open(urdf_path, "w") as file: + file.write(urdf_string) + + print(f"URDF saved to {urdf_path}") + + # Load into Robotic Toolbox + robot = rtb.ERobot.URDF(urdf_path) + print(robot) +""" + node.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main() diff --git a/ros_osc/joint_info/osc4py3/osc_joint_states_pub.py b/ros_osc/joint_info/osc4py3/osc_joint_states_pub.py new file mode 100644 index 0000000..487fc91 --- /dev/null +++ b/ros_osc/joint_info/osc4py3/osc_joint_states_pub.py @@ -0,0 +1,92 @@ +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import JointState +from osc4py3.as_eventloop import * +from osc4py3 import oscbuildparse + +class JointStateOSC(Node): + def __init__(self): + super().__init__('joint_states_osc') + + # Create a ROS 2 subscriber to /joint_states topic + self.subscription = self.create_subscription( + JointState, + '/joint_states', + self.joint_states_callback, + 1 # Queue size + ) + + # Open Sound Control (OSC) Client settings + self.osc_ip = "127.0.0.1" # Replace with the target IP + self.osc_port = 8000 # Replace with the target port + + # Start the OSC system + osc_startup() + + # Make client channels to send packets + osc_udp_client(self.osc_ip, self.osc_port, "osc_client") + + def joint_states_callback(self, msg): + """Callback function to handle incoming joint states.""" + header = msg.header + joint_names = msg.name + joint_positions = msg.position + joint_velocity = msg.velocity + joint_effort = msg.effort + + joint_names_str = "\n- ".join(joint_names) + joint_positions_str = "\n- ".join(map(str, joint_positions)) + joint_velocity_str = "\n- ".join(map(str, joint_velocity)) + joint_effort_str = "\n- ".join(map(str, joint_effort)) + + info = f""" +--- +header: + stamp: + sec: {header.stamp.sec} + nanosec: {header.stamp.nanosec} +name: +- {joint_names_str} +position: +- {joint_positions_str} +velocity: +- {joint_velocity_str} +effort: +- {joint_effort_str} +---""" + + # Send the info message + msg = oscbuildparse.OSCMessage("/joint_states", None, [info]) + osc_send(msg, "osc_client") + osc_process() + #print(f"Publishing: {info}") + + + # Send each joint state as an OSC message + for i, name in enumerate(joint_names): + #msg_sec = oscbuildparse.OSCMessage(f"/joint_states/header/sec", None, [header.stamp.sec]) + #msg_nanosec = oscbuildparse.OSCMessage(f"/joint_states/header/nanosec", None, [header.stamp.nanosec]) + msg_position = oscbuildparse.OSCMessage(f"/joint_states/{name}/position", None, [joint_positions[i]]) + msg_velocity = oscbuildparse.OSCMessage(f"/joint_states/{name}/velocity", None, [joint_velocity[i]]) + msg_effort = oscbuildparse.OSCMessage(f"/joint_states/{name}/effort", None, [joint_effort[i]]) + + bun = oscbuildparse.OSCBundle(oscbuildparse.unixtime2timetag(header.stamp.sec + header.stamp.nanosec), [msg_position, msg_velocity, msg_effort]) + #bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY , [msg_position, msg_velocity, msg_effort]) + osc_send(bun, "osc_client") + osc_process() + #print(f"OSC bundle sent for joint {name}") + +def main(): + rclpy.init() + node = JointStateOSC() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + osc_terminate() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ros_osc/joint_info/osc4py3/osc_joint_states_sub.py b/ros_osc/joint_info/osc4py3/osc_joint_states_sub.py new file mode 100644 index 0000000..a98344e --- /dev/null +++ b/ros_osc/joint_info/osc4py3/osc_joint_states_sub.py @@ -0,0 +1,41 @@ +from osc4py3.as_eventloop import * +from osc4py3 import oscmethod as osm +from osc4py3 import oscbuildparse +import time + +def joint_states_handler(*args): + """Handler function to process incoming joint states.""" + print(f"Received message") + for arg in args: + print(arg) + +def main(): + ip = "0.0.0.0" # IP address to listen on + port = 8000 # Port to listen on + + # Start the OSC system + osc_startup() + + # Make server channels to receive packets + osc_udp_server(ip, port, "osc_server") + + # Associate Python functions with message address patterns + osc_method("/joint_states", joint_states_handler, argscheme=osm.OSCARG_MESSAGE) + + print(f"Listening for OSC messages on {ip}:{port}...") + + try: + # Run the event loop + while True: + osc_process() # Process OSC messages + time.sleep(0.01) # Sleep to avoid high CPU usage + + except KeyboardInterrupt: + print("") + + finally: + # Properly close the system + osc_terminate() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ros_osc/joint_info/pythonosc/bundles/bundle_server_test.py b/ros_osc/joint_info/pythonosc/bundles/bundle_server_test.py new file mode 100644 index 0000000..4ab13d6 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/bundle_server_test.py @@ -0,0 +1,26 @@ +from pythonosc import dispatcher +from pythonosc import osc_server +import time + +def joint_handler(address, *args): + print(args) + time.sleep(0.3) + +def main(): + ip = "0.0.0.0" # Listen on all network interfaces + port = 5005 # Must match the sender's port in `joint_state_osc.py` + + # Create dispatcher and register callback + disp = dispatcher.Dispatcher() + #disp.map("/joint_states", joint_handler) + disp.map("/SYNC", joint_handler) + + server = osc_server.ThreadingOSCUDPServer((ip, port), disp) # Start OSC server + + try: + server.serve_forever() # Keep server running + except KeyboardInterrupt: + print("") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ros_osc/joint_info/pythonosc/bundles/bundle_timetag.py b/ros_osc/joint_info/pythonosc/bundles/bundle_timetag.py new file mode 100644 index 0000000..7686bbb --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/bundle_timetag.py @@ -0,0 +1,30 @@ +from pythonosc import dispatcher +from pythonosc import osc_server +from pythonosc.osc_bundle import OscBundle +from pythonosc.osc_message import OscMessage + +def joint_handler(address, *args): + if isinstance(args[0], OscBundle): + bundle = args[0] + print(f"Received OSC bundle with timestamp: {bundle.timestamp}") + for element in bundle.bundle_elements: + if isinstance(element, OscMessage): + print(f"Message: {element.address}, Args: {element.params}") + +def main(): + ip = "0.0.0.0" # Listen on all network interfaces + port = 5005 # Must match the sender's port in `joint_state_osc.py` + + # Create dispatcher and register callback + disp = dispatcher.Dispatcher() + disp.map("/joint_states/*", joint_handler) + + server = osc_server.ThreadingOSCUDPServer((ip, port), disp) # Start OSC server + + try: + server.serve_forever() # Keep server running + except KeyboardInterrupt: + print("") + +if __name__ == "__main__": + main() diff --git a/ros_osc/joint_info/pythonosc/bundles/different_lib.py b/ros_osc/joint_info/pythonosc/bundles/different_lib.py new file mode 100644 index 0000000..30e92b8 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/different_lib.py @@ -0,0 +1,38 @@ +# Import needed modules from osc4py3 +import osc4py3.as_eventloop +from osc4py3 import oscmethod as osm + +def handlerfunction(*args): + for arg in args: + print(arg) + pass +""" +def handlerfunction2(address, s, x, y): + # Will receive message address, and message data flattened in s, x, y + print(f's: {s}') + print(f'x: {x}') + print(f'y: {y}') + pass +""" +# Start the system. +osc_startup() + +# Make server channels to receive packets. +osc_udp_server("0.0.0.0", 5005, "aservername") +osc_udp_server("0.0.0.0", 5005, "anotherserver") + +# Associate Python functions with message address patterns, using default +# argument scheme OSCARG_DATAUNPACK. +osc_method("SYNC", handlerfunction) +# Too, but request the message address pattern before in argscheme +#osc_method("SYNC", handlerfunction2, argscheme=osm.OSCARG_ADDRESS + osm.OSCARG_DATAUNPACK) + +# Periodically call osc4py3 processing method in your event loop. +finished = False +while not finished: + # … + osc_process() + # … + +# Properly close the system. +osc_terminate() \ No newline at end of file diff --git a/ros_osc/joint_info/pythonosc/bundles/joint_states_bundle.py b/ros_osc/joint_info/pythonosc/bundles/joint_states_bundle.py new file mode 100644 index 0000000..c500282 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/joint_states_bundle.py @@ -0,0 +1,75 @@ +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import JointState +from pythonosc.udp_client import SimpleUDPClient +from pythonosc import osc_bundle_builder +from pythonosc import osc_message_builder + +class JointStateOSC(Node): + def __init__(self): + super().__init__('joint_states_osc') + + # Create a ROS 2 subscriber to /joint_states topic + self.subscription = self.create_subscription( + JointState, + '/joint_states', + self.joint_states_callback, + 1 # Queue size + ) + + self.osc_ip = "127.0.0.1" # target IP + self.osc_port = 5005 # target port + self.osc_client = SimpleUDPClient(self.osc_ip, self.osc_port) + + def joint_states_callback(self, msg): + """Callback function to handle incoming joint states.""" + header = msg.header + #print(f"Received joint states at {header.stamp.sec}.{header.stamp.nanosec}") + joint_names = msg.name + joint_positions = msg.position + joint_velocity = msg.velocity + joint_effort = msg.effort + + + bundle = osc_bundle_builder.OscBundleBuilder(timestamp=header.stamp.sec + header.stamp.nanosec * 1e-9) + #bundle = osc_bundle_builder.OscBundleBuilder(osc_bundle_builder.IMMEDIATELY) + + names = osc_message_builder.OscMessageBuilder(address="/joint_states/names") + names.add_arg([str(name) for name in joint_names]) + positions = osc_message_builder.OscMessageBuilder(address="/joint_states/positions") + positions.add_arg([float(pos) for pos in joint_positions]) + velocity = osc_message_builder.OscMessageBuilder(address="/joint_states/velocity") + velocity.add_arg([float(vel) for vel in joint_velocity]) + effort = osc_message_builder.OscMessageBuilder(address="/joint_states/effort") + effort.add_arg([float(eff) for eff in joint_effort]) + + bundle.add_content(names.build()) + bundle.add_content(positions.build()) + bundle.add_content(velocity.build()) + bundle.add_content(effort.build()) + + info = osc_message_builder.OscMessageBuilder(address="/joint_states") + info.add_arg("joint names:") + info.add_arg([str(name) for name in joint_names]) + info.add_arg("joint positions:") + info.add_arg([float(pos) for pos in joint_positions]) + info.add_arg("joint velocity:") + info.add_arg([float(vel) for vel in joint_velocity]) + info.add_arg("joint effort:") + info.add_arg([float(eff) for eff in joint_effort]) + + bundle.add_content(info.build()) + + self.osc_client.send(bundle.build()) + +def main(): + try: + rclpy.init() + node = JointStateOSC() + rclpy.spin(node) + except KeyboardInterrupt: print("") + #node.destroy_node() + #rclpy.shutdown() + +if __name__ == '__main__': + main() diff --git a/ros_osc/joint_info/pythonosc/bundles/read_bundle.py b/ros_osc/joint_info/pythonosc/bundles/read_bundle.py new file mode 100644 index 0000000..f1fd567 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/read_bundle.py @@ -0,0 +1,28 @@ +import argparse +from pythonosc import dispatcher, osc_server +from pythonosc.osc_bundle import OscBundle +from pythonosc.osc_message import OscMessage + +def bundle_handler(address, *args): + for arg in args: + if isinstance(arg, OscBundle): + print(f"Received OSC bundle with timestamp: {arg.timestamp}") + for element in arg.bundle_elements: + if isinstance(element, OscMessage): + print(f"Message: {element.address}, Args: {element.params}") + +def main(): + parser = argparse.ArgumentParser(description="OSC Bundle Receiver") + parser.add_argument("--ip", default="127.0.0.1", help="The IP address to listen on") + parser.add_argument("--port", type=int, default=5005, help="The port to listen on") + args = parser.parse_args() + + disp = dispatcher.Dispatcher() + disp.set_default_handler(bundle_handler) + + server = osc_server.ThreadingOSCUDPServer((args.ip, args.port), disp) + print(f"Listening for OSC bundles on {args.ip}:{args.port}...") + server.serve_forever() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ros_osc/joint_info/pythonosc/bundles/send_bundle_test.py b/ros_osc/joint_info/pythonosc/bundles/send_bundle_test.py new file mode 100644 index 0000000..87a2ad5 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/bundles/send_bundle_test.py @@ -0,0 +1,36 @@ +from pythonosc import osc_bundle_builder +from pythonosc import osc_message_builder +import argparse +import time +from pythonosc import udp_client + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--ip", default="127.0.0.1", + help="The ip of the OSC server") + parser.add_argument("--port", type=int, default=5005, + help="The port the OSC server is listening on") + args = parser.parse_args() + + client = udp_client.SimpleUDPClient(args.ip, args.port) + +bundle = osc_bundle_builder.OscBundleBuilder(timestamp=time.time() + 10) +msg = osc_message_builder.OscMessageBuilder(address="/SYNC") +msg.add_arg(4.0) +# Add 4 messages in the bundle, each with more arguments. +bundle.add_content(msg.build()) +msg.add_arg(2) +bundle.add_content(msg.build()) +msg.add_arg("value") +bundle.add_content(msg.build()) + +sub_bundle = bundle.build() +# Now add the same bundle inside itself. +bundle.add_content(sub_bundle) +# The bundle has 5 elements in total now. + +bundle = bundle.build() +while True:# You can now send it via a client with the `.send()` method: + client.send(bundle) + time.sleep(1) \ No newline at end of file diff --git a/ros_osc/joint_info/joint_names_reader.py b/ros_osc/joint_info/pythonosc/joint_names_reader.py similarity index 100% rename from ros_osc/joint_info/joint_names_reader.py rename to ros_osc/joint_info/pythonosc/joint_names_reader.py diff --git a/ros_osc/joint_info/joint_state_overview.py b/ros_osc/joint_info/pythonosc/joint_state_overview.py similarity index 100% rename from ros_osc/joint_info/joint_state_overview.py rename to ros_osc/joint_info/pythonosc/joint_state_overview.py diff --git a/ros_osc/joint_info/pythonosc/joint_state_overview_osc4py3.py b/ros_osc/joint_info/pythonosc/joint_state_overview_osc4py3.py new file mode 100644 index 0000000..7f63db1 --- /dev/null +++ b/ros_osc/joint_info/pythonosc/joint_state_overview_osc4py3.py @@ -0,0 +1,43 @@ +from osc4py3.as_eventloop import * +from osc4py3.oscmethod import * +import time + +last_received = time.time() + +def joint_handler(address, info): + global last_received + now = time.time() + + if now - last_received > 0.1 and address == "/joint_states": # Limit updates to every 100ms + last_received = now + if info: + print(info) + +def main(): + ip = "127.0.0.1" # "0.0.0.0" Listen on all network interfaces + port = 8000 # Must match the sender's port in `joint_state_osc.py` + + # Start the system + osc_startup() + + # Make server channels to receive packets + osc_udp_server(ip, port, "joint_state_server") + + # Associate Python functions with message address patterns + osc_method("/joint_states", joint_handler, argscheme=OSCARG_ADDRESS + OSCARG_DATAUNPACK) + + try: + # Run the event loop + while True: + osc_process() # Process OSC messages + time.sleep(0.01) # Sleep to avoid high CPU usage + + except KeyboardInterrupt: + print("") + + finally: + # Properly close the system + osc_terminate() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ros_osc/joint_info/joint_state_reader.py b/ros_osc/joint_info/pythonosc/joint_state_reader.py similarity index 100% rename from ros_osc/joint_info/joint_state_reader.py rename to ros_osc/joint_info/pythonosc/joint_state_reader.py diff --git a/ros_osc/joint_info/osc_joint_states.py b/ros_osc/joint_info/pythonosc/osc_joint_states_pub.py similarity index 100% rename from ros_osc/joint_info/osc_joint_states.py rename to ros_osc/joint_info/pythonosc/osc_joint_states_pub.py diff --git a/ros_osc/joint_info/test.py b/ros_osc/joint_info/test.py deleted file mode 100644 index 6cd8f7a..0000000 --- a/ros_osc/joint_info/test.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -import roboticstoolbox as rtb - -# Get the URDF from ROS 2 -os.system("ros2 param get /robot_state_publisher robot_description > urdf.xml") - -# Load the URDF using roboticstoolbox -robot = rtb.ERobot.URDF("urdf.xml") -print("Robot loaded:", robot)