From b4abe1eec147321c9ad9cffadfe4a380a3abfa4a Mon Sep 17 00:00:00 2001 From: Ali Date: Wed, 28 May 2025 06:00:56 +0200 Subject: [PATCH] [AN] (feat) Prepare for using configs --- workspace/src/osc_ros2/osc_ros2/osc_ros2.py | 461 ++++++++++---------- 1 file changed, 239 insertions(+), 222 deletions(-) diff --git a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py index 0b28d5b..98da17b 100644 --- a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py +++ b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py @@ -1,3 +1,4 @@ +from typing import Any import rclpy from rclpy.node import Node from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint @@ -16,6 +17,7 @@ import re import socket class JointNameListener(Node): + joint_names: list[str]|None def __init__(self): super().__init__('joint_name_listener') self.subscription = self.create_subscription( @@ -69,202 +71,6 @@ class OSC_ROS2_interface(Node): self.new = False self.n_joints = len(joint_names) - if robot: - while True: - print('+-' * 50) - set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \ - (self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \ - (self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {self.x_limits}") - print(f"y: {self.y_limits}") - print(f"z: {self.z_limits}") - con = True - while con: - confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - self.x_limits = [None, None] - self.y_limits = [None, None] - self.z_limits = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - - while True: - print('+-' * 50) - set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - self.x_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - self.y_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - self.z_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(self.x_limits_workspace) != 2 or len(self.y_limits_workspace) != 2 or len(self.z_limits_workspace) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (self.x_limits_workspace[0] is not None and self.x_limits_workspace[1] is not None and self.x_limits_workspace[0] >= self.x_limits_workspace[1]) or \ - (self.y_limits_workspace[0] is not None and self.y_limits_workspace[1] is not None and self.y_limits_workspace[0] >= self.y_limits_workspace[1]) or \ - (self.z_limits_workspace[0] is not None and self.z_limits_workspace[1] is not None and self.z_limits_workspace[0] >= self.z_limits_workspace[1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {self.x_limits_workspace}") - print(f"y: {self.y_limits_workspace}") - print(f"z: {self.z_limits_workspace}") - con = True - while con: - confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - self.x_limits_workspace = [None, None] - self.y_limits_workspace = [None, None] - self.z_limits_workspace = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - # Ask the user if they want to set new joint limits - - # Ask the user if they want to set new joint limits - while True: - print('+-'*50) - update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() - if update_limits == 'y': - for i in range(len(self.joint_names)): - while True: - try: - lim = self.robot.qlim.copy() - # Find the link corresponding to the joint name - print("-" * 50) - print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") - lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print("Invalid input. Lower limit must be less than upper limit.") - continue - lower_limit = float(lower_limit) if lower_limit!="" else None - upper_limit = float(upper_limit) if upper_limit!="" else None - if lower_limit!=None: - if lower_limitlim[1][i]: - while True: - sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() - if sure == 'y': - lim[1][i] = float(upper_limit) - break - elif sure == 'n': - print("Upper limit not changed.") - break - print("Invalid input. Please enter 'y' or 'n'.") - else: lim[1][i] = float(upper_limit) - self.robot.qlim = lim - print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") - print("-" * 50) - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - break - if update_limits == 'n': - break - print("Invalid input. Please enter 'y' or 'n'.") - ''' - use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() - if use_link_mask == 'y': - while True: - try: - ''' - else: - while True: - print('+-' * 50) - update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() - if update_limits == 'y': - self.joint_lim = [[None]*self.n_joints,[None]*self.n_joints] - for i, joint in enumerate(self.joint_names): - while True: - try: - print("-" * 50) - lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print('--' * 50) - print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") - continue - self.joint_lim[0][i] = float(lower_limit) if lower_limit!="" else None - self.joint_lim[1][i] = float(upper_limit) if upper_limit!="" else None - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - print(f'New limits for joint:\n lower: {self.joint_lim[0]}\n upper: {self.joint_lim[1]}') - break - elif update_limits == 'n': - self.joint_lim = None - break - print("Invalid input. Please enter 'y' or 'n'.") - - - - while True: - try: - print('+-' * 50) - self.trajectory_topic_name = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() - if self.trajectory_topic_name == "": - self.trajectory_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory' - break - elif self.trajectory_topic_name.startswith("/"): - break - else: - print("Invalid topic name. A valid topic name should start with '/'.") - except Exception as e: - print(f"An error occurred: {e}") # ROS2 Publisher self.publisher = self.create_publisher( @@ -966,28 +772,27 @@ class OSC_ROS2_interface(Node): msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)]) osc_send(msg_log, "osc_log_client") - -def main(): - """Main function to get joint names and start the ROS 2 & OSC system.""" - rclpy.init() +def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: + config = {} + robot = None while True: - use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() - if use_urdf == 'y': + config["use_urdf"] = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() + if config["use_urdf"] == 'y': while True: - robot_urdf = input("Enter the absolute path to the URDF file: ") - if os.path.isfile(robot_urdf): - if not robot_urdf.endswith('.urdf'): + config["robot_urdf"] = input("Enter the absolute path to the URDF file: ") + if os.path.isfile(config["robot_urdf"]): + if not config["robot_urdf"].endswith('.urdf'): print("The file is not a URDF file. Please enter a valid URDF file.") continue break else: print("Invalid path. Please enter a valid path to the URDF file.") - tree = ET.parse(robot_urdf) + tree = ET.parse(config["robot_urdf"]) root = tree.getroot() robot = rtb.ERobot.URDF(robot_urdf) - joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] + config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] print(robot) - joint_velocity_limits = {} + config["joint_velocity_limits"] = {} # Iterate over all joints in the URDF for joint in root.findall('.//joint'): @@ -1001,14 +806,15 @@ def main(): velocity_limit = limit.get('velocity') if velocity_limit is not None: - joint_velocity_limits[joint_name] = float(velocity_limit) + config["joint_velocity_limits"][joint_name] = float(velocity_limit) while True: try: print('-+'*50) print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") - cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")] + config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") + cost_mask = [int(i) for i in config["cost_mask_string"]] if sum(cost_mask) <= robot.n and len(cost_mask) == 6: break else: @@ -1029,21 +835,21 @@ def main(): counter+=1 time.sleep(0.1) rclpy.spin_once(node) - joint_names = node.joint_names + config["joint_names"] = node.joint_names node.destroy_node() - if joint_names: - correct_names = input("The following joint names were found:\n" + "\n".join(joint_names) + "\nAre these correct? (y/n): ").strip().lower() + if config["joint_names"]: + correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() while True: if correct_names == 'y': break elif correct_names == 'n': - joint_names = None + config["joint_names"] = None break correct_names = input("Invalid input. Please enter 'y' or 'n'.") - if not(joint_names): + if not(config["joint_names"]): print("Please enter the joint names manually.") while True: - joint_names = [] + config["joint_names"] = [] print('-+'*50) print("Enter the joint names manually one by one. Type 'done' when you are finished:") print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") @@ -1053,17 +859,17 @@ def main(): if joint_name.lower() == 'done': break if joint_name: - joint_names.append(joint_name) + config["joint_names"].append(joint_name) print('-+'*50) - correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {joint_names}. (y/n)?: ").strip() + correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config["joint_names"]}. (y/n)?: ").strip() if correct_names.lower() == 'y': break else: print("Please re-enter the joint names.") while True: try: - joint_velocity_limits = {} - for name in joint_names: + config["joint_velocity_limits"] = {} + for name in config["joint_names"]: while True: try: print('--'*50) @@ -1071,7 +877,7 @@ def main(): if limit == "": continue else: - joint_velocity_limits[name] = float(limit) + config["joint_velocity_limits"][name] = float(limit) break except ValueError: print("Invalid input. Please enter a numeric value or press Enter to skip.") @@ -1082,9 +888,220 @@ def main(): cost_mask = None break print("Invalid input. Please enter 'y' or 'n'.") + if robot: + while True: + print('+-' * 50) + set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \ + (config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \ + (config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print(f"Current limits:") + print(f"x: {config["x_limits"]}") + print(f"y: {config["y_limits"]}") + print(f"z: {config["z_limits"]}") + con = True + while con: + confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + config["x_limits"] = [None, None] + config["y_limits"] = [None, None] + config["z_limits"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + + while True: + print('+-' * 50) + set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \ + (config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \ + (config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print(f"Current limits:") + print(f"x: {config["x_limits_workspace"]}") + print(f"y: {config["y_limits_workspace"]}") + print(f"z: {config["z_limits_workspace"]}") + con = True + while con: + confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + config["x_limits_workspace"] = [None, None] + config["y_limits_workspace"] = [None, None] + config["z_limits_workspace"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + # Ask the user if they want to set new joint limits + while True: + print('+-'*50) + update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() + if update_limits == 'y': + for i in range(len(config["joint_names"])): + while True: + try: + lim = robot.qlim.copy() + # Find the link corresponding to the joint name + print("-" * 50) + print(f"Current position limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + lower_limit = input(f"Enter the new lower limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print("Invalid input. Lower limit must be less than upper limit.") + continue + lower_limit = float(lower_limit) if lower_limit!="" else None + upper_limit = float(upper_limit) if upper_limit!="" else None + if lower_limit!=None: + if lower_limitlim[1][i]: + while True: + sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() + if sure == 'y': + lim[1][i] = float(upper_limit) + break + elif sure == 'n': + print("Upper limit not changed.") + break + print("Invalid input. Please enter 'y' or 'n'.") + else: lim[1][i] = float(upper_limit) + robot.qlim = lim + print(f"New limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + print("-" * 50) + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + break + if update_limits == 'n': + break + print("Invalid input. Please enter 'y' or 'n'.") + ''' + use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() + if use_link_mask == 'y': + while True: + try: + ''' + else: + while True: + print('+-' * 50) + update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() + n_joints = len(config["joint_names"]) + if update_limits == 'y': + config["joint_lim"] = [[]*n_joints,[]*n_joints] + for i, joint in enumerate(config["joint_names"]): + while True: + try: + print("-" * 50) + lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print('--' * 50) + print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") + continue + config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None + config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}') + break + elif update_limits == 'n': + config["joint_lim"] = None + break + print("Invalid input. Please enter 'y' or 'n'.") + while True: + try: + print('+-' * 50) + config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() + if config["trajectory_topic_name"] == "": + config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' + break + elif config["trajectory_topic_name"].startswith("/"): + break + else: + print("Invalid topic name. A valid topic name should start with '/'.") + except Exception as e: + print(f"An error occurred: {e}") + while True: + save_config = input("Do you want to save this interface? (y/n): ").strip().lower() + if save_config == "y": + config_path = input("Where do you want to save the config?: ").strip() + if not os.path.isdir(config_path): + print("Warning: Path does not exist") + print("Making Directory...") + os.mkdir(config_path) + print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml") + break + elif save_config == "n": + print("Will not be saving config") + print("Invalid input. Please enter 'y' or 'n'.") + return config, robot - node = OSC_ROS2_interface(joint_names, joint_velocity_limits, robot, cost_mask) +def main(): + """Main function to get joint names and start the ROS 2 & OSC system.""" + rclpy.init() + # TODO: if no config use interactive + config, robot = interactive_input() + node = OSC_ROS2_interface(config["joint_names"], config["joint_velocity_limits"], robot, config["cost_mask"]) # Run ROS 2 spin, and osc_process will be handled by the timer try: