From f3f3113e6503180bac78892f7bd2e6349679be36 Mon Sep 17 00:00:00 2001 From: Alexander Schaefer Date: Wed, 30 Apr 2025 14:02:31 +0200 Subject: [PATCH] AS: adding sandbox example --- workspace/src/joint_angles.py | 12 +- .../joint_control/plugdata_cart_fix.py | 11 +- .../joint_control/joint_control/sandbox.py | 296 ++++++++++++++++++ workspace/src/joint_control/setup.py | 1 + 4 files changed, 312 insertions(+), 8 deletions(-) create mode 100644 workspace/src/joint_control/joint_control/sandbox.py diff --git a/workspace/src/joint_angles.py b/workspace/src/joint_angles.py index 40f7f71..271fa0a 100644 --- a/workspace/src/joint_angles.py +++ b/workspace/src/joint_angles.py @@ -11,13 +11,13 @@ def main(): osc_udp_client("192.168.1.24", 8000, "osc_client") # Example joint positions to send - joint_positions1 = [0.0,0.0, 0.0, 0.0, 0.0, 0.0] - joint_positions2 = [-0.4,-1, 0.4, 0.0,0.0, 0.0] - joint_positions3 = [-0.4,-1, 0.2, 0.0, 0.0, 0.0]#, 6.0] - joint_positions4 = [-0.4,1, 0.2, 0.0, 0.0, 0.0]#, 6.0] - joint_positions5 = [-0.4,1, 0.4, 0.0, 0.0, 0.0]#, 6.0] + joint_positions1 = [0.5,0.034, 0.839, 0.0, 0.0, 0.0] + joint_positions2 = [0.4,-0.4, 0.4, 0.0,0.0, 0.0] + joint_positions3 = [0.4,-0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0] + joint_positions4 = [0.0,0.0, 0.6, 0.0, 0.0, 0.0]#, 6.0] + joint_positions5 = [-0.4,0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0] - msg = oscbuildparse.OSCMessage("/joint_angles", None, [joint_positions1])#, joint_positions3])#, joint_positions4, joint_positions5]) + msg = oscbuildparse.OSCMessage("/joint_trajectory", None, [joint_positions1, joint_positions2])#, joint_positions3, joint_positions4, joint_positions5]) osc_send(msg, "osc_client") osc_process() print("Sending joint positions") diff --git a/workspace/src/joint_control/joint_control/plugdata_cart_fix.py b/workspace/src/joint_control/joint_control/plugdata_cart_fix.py index a045cce..7fc0c9e 100644 --- a/workspace/src/joint_control/joint_control/plugdata_cart_fix.py +++ b/workspace/src/joint_control/joint_control/plugdata_cart_fix.py @@ -109,6 +109,12 @@ class ScaledJointTrajectoryPublisher(Node): break except ValueError: print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + ''' + use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() + if use_link_mask == 'y': + while True: + try: +''' self.hz = float(input("Enter the desired refresh frequency (Hz): ")) @@ -117,7 +123,8 @@ class ScaledJointTrajectoryPublisher(Node): def joint_angles_handler(self, *args): # Ensure the desired joint positions are within the specified limits - print("received joint angles") + #print("received joint angles") + x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)] if self.x_limits[0] is not None: x = max(self.x_limits[0], x) @@ -195,7 +202,7 @@ class ScaledJointTrajectoryPublisher(Node): continue point = JointTrajectoryPoint() point.positions = list(sol[0]) - duration *= 2 + duration *= 5 duration += prev_duration prev_duration = duration point.time_from_start.sec = int(duration) diff --git a/workspace/src/joint_control/joint_control/sandbox.py b/workspace/src/joint_control/joint_control/sandbox.py new file mode 100644 index 0000000..fd0b115 --- /dev/null +++ b/workspace/src/joint_control/joint_control/sandbox.py @@ -0,0 +1,296 @@ +import rclpy +from rclpy.node import Node +from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint +from sensor_msgs.msg import JointState +from osc4py3.as_allthreads import * +from osc4py3 import oscmethod as osm +import xml.etree.ElementTree as ET +import numpy as np +import spatialmath as sm +import roboticstoolbox as rtb + +class ScaledJointTrajectoryPublisher(Node): + """Node to publish joint trajectories based on OSC messages.""" + + def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask): + super().__init__('scaled_joint_trajectory_publisher') + + # ROS2 Publisher + self.publisher = self.create_publisher( + JointTrajectory, + '/scaled_joint_trajectory_controller/joint_trajectory', + 1 + ) + + self.subscription = self.create_subscription( + JointState, + '/joint_states', + self.joint_states_callback, + 1 # Increased queue size for joint states + ) + + # Store received joint positions + self.current_joint_positions = [0.0] * len(joint_names) + self.joint_names = joint_names + self.joint_velocity_limits = joint_velocity_limits + self.desired_joint_positions = [0.0] * len(joint_names) + self.previous_desired = [0.0] * len(joint_names) + self.robot = robot + self.cost_mask = cost_mask + self.prev_pose = None + + ip = "0.0.0.0" # Listen on all network interfaces + port = 8000 # Must match the sender's port in `joint_state_osc.py` + + osc_startup() + osc_udp_server(ip, port, "osc_server") + print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument") + # Register OSC handler + osc_method("/coordinates", self.coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK) + osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK) + + def joint_angles_handler(self, *args): + """Handles incoming OSC messages for joint positions.""" + msg = JointTrajectory() + msg.joint_names = self.joint_names + n=2 + for arg in args: + if len(arg) == len(self.joint_names): + point = JointTrajectoryPoint() + point.positions = list(arg) + point.time_from_start.sec = n + n+=2 + point.time_from_start.nanosec = 0 + msg.points.append(point) + elif len(arg) == len(self.joint_names) + 1: + point = JointTrajectoryPoint() + point.positions = list(arg[:-1]) + point.time_from_start.sec = int(arg[-1]) + point.time_from_start.nanosec = int((arg[-1] - int(arg[-1])) * 1e9) + msg.points.append(point) + + self.publisher.publish(msg) + print("published joint positions") + + set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \ + (self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \ + (self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print(f"Current limits:") + print(f"x: {self.x_limits}") + print(f"y: {self.y_limits}") + print(f"z: {self.z_limits}") + con = True + while con: + confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: break + except ValueError: + print("Invalid input. Please enter numeric values only.") + + # Ask the user if they want to set new joint limits + update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() + if update_limits == 'y': + for i in range(len(self.joint_names)): + while True: + try: + lim = self.robot.qlim.copy() + # Find the link corresponding to the joint name + print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") + lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print("Invalid input. Lower limit must be less than upper limit.") + continue + + if lower_limit: + lim[0][i] = float(lower_limit) + if upper_limit: + lim[1][i] = float(upper_limit) + self.robot.qlim = lim + print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") + print("-" * 50) + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + ''' + use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() + if use_link_mask == 'y': + while True: + try: +''' + + + self.hz = float(input("Enter the desired refresh frequency (Hz): ")) + # Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop + self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically + + def coordinates_handler(self, *args): + # Ensure the desired joint positions are within the specified limits + #print("received joint angles") + + x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)] + if self.x_limits[0] is not None: + x = max(self.x_limits[0], x) + if self.x_limits[1] is not None: + x = min(self.x_limits[1], x) + if self.y_limits[0] is not None: + y = max(self.y_limits[0], y) + if self.y_limits[1] is not None: + y = min(self.y_limits[1], y) + if self.z_limits[0] is not None: + z = max(self.z_limits[0], z) + if self.z_limits[1] is not None: + z = min(self.z_limits[1], z) + + if x != args[0] or y != args[1] or z != args[2]: + self.get_logger().warn( + f"Desired joint positions adjusted to fit within limits: " + f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})" + ) + + self.desired_joint_positions = [x, y, z, r, p, yaw] + + + def joint_states_callback(self, msg): + """Callback function to handle incoming joint states.""" + joint_position_dict = dict(zip(msg.name, msg.position)) + self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names] + joint_position_dict = dict(zip(msg.name, msg.velocity)) + self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names] + + def update_position(self): + if self.desired_joint_positions == self.previous_desired: + return + msg = JointTrajectory() + msg.joint_names = self.joint_names + steps_per_m = 100 + if True: #len(args[0]) == len(self.joint_names): + prev_duration = 0 + if self.prev_pose == None: + [x,y,z] = self.robot.fkine(self.current_joint_positions).t + [roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy() + else: + [x,y,z] = self.prev_pose[:3] + [roll, pitch, yaw] = self.prev_pose[3:] + x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions + self.prev_pose = self.desired_joint_positions + steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m) + if steps < 2: steps = 2 + cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)] + for j in range(steps): + sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') + if sol[1] == 1: + fowards = self.robot.fkine_all(sol[0]) + out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False) + if np.any(out_of_bounds): + #print(fowards.t) + #indices = np.where(out_of_bounds)[0] + #print(f"indices: {indices}") + self.get_logger().warn("One or more links moved out of bounds!") + ''' + for i in indices: + try: + print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}") + except IndexError: + print(f"index {i} is out of bounds, but no corresponding joint found.") + self.previous_desired = self.desired_joint_positions + ''' + break + duration = 0 + prev = self.current_joint_positions if j == 0 else prev_sol + for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()): + duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun + prev_sol = list(sol[0]) + if duration == 0: + continue + point = JointTrajectoryPoint() + point.positions = list(sol[0]) + duration *= 5 + duration += prev_duration + prev_duration = duration + point.time_from_start.sec = int(duration) + point.time_from_start.nanosec = int((duration - int(duration)) * 1e9) + msg.points.append(point) + else: + print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!') + prev_sol = self.current_joint_positions + if len(msg.points) == 0: + return + msg.header.stamp = self.get_clock().now().to_msg() + self.publisher.publish(msg) + self.previous_desired = self.desired_joint_positions + +def main(): + """Main function to get joint names and start the ROS 2 & OSC system.""" + robot_urdf = input("Enter the path to the URDF file: ") + tree = ET.parse(robot_urdf) + root = tree.getroot() + joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] + robot = rtb.ERobot.URDF(robot_urdf) + joint_velocity_limits = {} + + # Iterate over all joints in the URDF + for joint in root.findall('.//joint'): + joint_name = joint.get('name') # Get the name of the joint + + # Look for the tag under each joint + limit = joint.find('limit') + + if limit is not None: + # Extract the velocity limit (if it exists) + velocity_limit = limit.get('velocity') + + if velocity_limit is not None: + joint_velocity_limits[joint_name] = float(velocity_limit) + + while True: + try: + print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].") + print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.") + cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")] + if sum(cost_mask) <= robot.n and len(cost_mask) == 6: + break + else: + print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.") + except ValueError: + print("Invalid input. Please enter integers only.") + print(f"Cost mask: {cost_mask}") + + rclpy.init() + + node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask) + + # Run ROS 2 spin, and osc_process will be handled by the timer + try: + rclpy.spin(node) + except KeyboardInterrupt: + print("") + finally: + node.destroy_node() + rclpy.shutdown() + osc_terminate() + +if __name__ == '__main__': + main() diff --git a/workspace/src/joint_control/setup.py b/workspace/src/joint_control/setup.py index 429803d..60cf2db 100644 --- a/workspace/src/joint_control/setup.py +++ b/workspace/src/joint_control/setup.py @@ -36,6 +36,7 @@ setup( 'plugdata_cart_fix = joint_control.plugdata_cart_fix:main', 'plugdata_cart_smooth = joint_control.plugdata_cart_smooth:main', 'test=joint_control.test:main', + 'sandbox=sandbox.sandbox:main', ], }, )