From b45371d39b1da3f6613ceeacad115408ac7c7389 Mon Sep 17 00:00:00 2001 From: Ali Date: Sun, 8 Jun 2025 10:41:50 +0200 Subject: [PATCH] [AN] (feat) Moves the interactive input to another file --- workspace/.gitignore | 3 + .../osc_ros2/osc_ros2/interactive_input.py | 351 ++++++++++++++++ workspace/src/osc_ros2/osc_ros2/osc_ros2.py | 376 +----------------- 3 files changed, 375 insertions(+), 355 deletions(-) create mode 100644 workspace/.gitignore create mode 100644 workspace/src/osc_ros2/osc_ros2/interactive_input.py diff --git a/workspace/.gitignore b/workspace/.gitignore new file mode 100644 index 0000000..3026820 --- /dev/null +++ b/workspace/.gitignore @@ -0,0 +1,3 @@ +build/** +install/** +log/** diff --git a/workspace/src/osc_ros2/osc_ros2/interactive_input.py b/workspace/src/osc_ros2/osc_ros2/interactive_input.py new file mode 100644 index 0000000..ca0b421 --- /dev/null +++ b/workspace/src/osc_ros2/osc_ros2/interactive_input.py @@ -0,0 +1,351 @@ +import os +import time +import xml.etree.ElementTree as ET +from typing import Any + +import rclpy +import roboticstoolbox as rtb +from rclpy import Node +from sensor_msgs.msg import JointState + + +class JointNameListener(Node): + joint_names: list[str]|None + def __init__(self): + super().__init__('joint_name_listener') + self.subscription = self.create_subscription( + JointState, + '/joint_states', + self.joint_state_callback, + 1 + ) + self.joint_names = None + + def joint_state_callback(self, msg: JointState): + self.joint_names = list(msg.name) + +def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: + config = {} + robot = None + while True: + use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() + if use_urdf == 'y': + while True: + config["urdf"] = input("Enter the absolute path to the URDF file: ") + if os.path.isfile(config["urdf"]): + if not config["urdf"].endswith('.urdf'): + print("The file is not a URDF file. Please enter a valid URDF file.") + continue + break + else: + print("Invalid path. Please enter a valid path to the URDF file.") + tree = ET.parse(config["robot_urdf"]) + root = tree.getroot() + robot = rtb.ERobot.URDF(config["urdf"]) + config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] + print(robot) + config["joint_velocity_limits"] = {} + # Iterate over all joints in the URDF + for joint in root.findall('.//joint'): + joint_name = joint.get('name') # Get the name of the joint + + # Look for the tag under each joint + limit = joint.find('limit') + + if limit is not None: + # Extract the velocity limit (if it exists) + velocity_limit = limit.get('velocity') + + if velocity_limit is not None: + config["joint_velocity_limits"][joint_name] = float(velocity_limit) + + while True: + try: + print('-+'*50) + print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") + print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") + config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") + cost_mask = [int(i) for i in config["cost_mask_string"]] + if sum(cost_mask) <= robot.n and len(cost_mask) == 6: + break + else: + print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.") + except ValueError: + print("Invalid input. Please enter integers only.") + print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}") + break + elif use_urdf == 'n': + node = JointNameListener() + print("Wainting 10 sec for JointState messages to extract joint names...") + rclpy.spin_once(node) + counter = 0 + while not(node.joint_names): + if counter > 100: + config["joint_names"] = None + break + counter+=1 + time.sleep(0.1) + rclpy.spin_once(node) + config["joint_names"] = node.joint_names + node.destroy_node() + if config["joint_names"]: + correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() + while True: + if correct_names == 'y': + break + elif correct_names == 'n': + config["joint_names"] = None + break + correct_names = input("Invalid input. Please enter 'y' or 'n'.") + if not(config["joint_names"]): + print("Please enter the joint names manually.") + while True: + config["joint_names"] = [] + print('-+'*50) + print("Enter the joint names manually one by one. Type 'done' when you are finished:") + print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") + while True: + print('-'*50) + joint_name = input("Enter joint name (or 'done' to finish): ").strip() + if joint_name.lower() == 'done': + break + if joint_name: + config["joint_names"].append(joint_name) + print('-+'*50) + correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config['joint_names']}. (y/n)?: ").strip() + if correct_names.lower() == 'y': + break + else: + print("Please re-enter the joint names.") + while True: + try: + config["joint_velocity_limits"] = {} + for name in config["joint_names"]: + while True: + try: + print('--'*50) + limit = input(f"Enter the velocity limit for joint '{name}': ").strip() + if limit == "": + continue + else: + config["joint_velocity_limits"][name] = float(limit) + break + except ValueError: + print("Invalid input. Please enter a numeric value or press Enter to skip.") + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to skip.") + robot = None + cost_mask = None + break + print("Invalid input. Please enter 'y' or 'n'.") + if robot: + while True: + print('+-' * 50) + set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \ + (config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \ + (config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print("Current limits:") + print(f"x: {config['x_limits']}") + print(f"y: {config['y_limits']}") + print(f"z: {config['z_limits']}") + con = True + while con: + confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: + break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + config["x_limits"] = [None, None] + config["y_limits"] = [None, None] + config["z_limits"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + + while True: + print('+-' * 50) + set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \ + (config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \ + (config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print("Current limits:") + print(f"x: {config['x_limits_workspace']}") + print(f"y: {config['y_limits_workspace']}") + print(f"z: {config['z_limits_workspace']}") + con = True + while con: + confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: + break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + config["x_limits_workspace"] = [None, None] + config["y_limits_workspace"] = [None, None] + config["z_limits_workspace"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + # Ask the user if they want to set new joint limits + while True: + print('+-'*50) + update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() + if update_limits == 'y': + for i in range(len(config["joint_names"])): + while True: + try: + lim = robot.qlim.copy() + # Find the link corresponding to the joint name + print("-" * 50) + print(f"Current position limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + lower_limit = input(f"Enter the new lower limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print("Invalid input. Lower limit must be less than upper limit.") + continue + lower_limit = float(lower_limit) if lower_limit!="" else None + upper_limit = float(upper_limit) if upper_limit!="" else None + if lower_limit is not None: + if lower_limitlim[1][i]: + while True: + sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() + if sure == 'y': + lim[1][i] = float(upper_limit) + break + elif sure == 'n': + print("Upper limit not changed.") + break + print("Invalid input. Please enter 'y' or 'n'.") + else: + lim[1][i] = float(upper_limit) + robot.qlim = lim + print(f"New limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + print("-" * 50) + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + break + if update_limits == 'n': + break + print("Invalid input. Please enter 'y' or 'n'.") + ''' + use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() + if use_link_mask == 'y': + while True: + try: + ''' + else: + while True: + print('+-' * 50) + update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() + n_joints = len(config["joint_names"]) + if update_limits == 'y': + config["joint_lim"] = [[]*n_joints,[]*n_joints] + for i, joint in enumerate(config["joint_names"]): + while True: + try: + print("-" * 50) + lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print('--' * 50) + print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") + continue + config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None + config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}') + break + elif update_limits == 'n': + config["joint_lim"] = None + break + print("Invalid input. Please enter 'y' or 'n'.") + while True: + try: + print('+-' * 50) + config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() + if config["trajectory_topic_name"] == "": + config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' + break + elif config["trajectory_topic_name"].startswith("/"): + break + else: + print("Invalid topic name. A valid topic name should start with '/'.") + except Exception as e: + print(f"An error occurred: {e}") + while True: + save_config = input("Do you want to save this interface? (y/n): ").strip().lower() + if save_config == "y": + config_path = input("Where do you want to save the config?: ").strip() + if not os.path.isdir(config_path): + print("Warning: Path does not exist") + print("Making Directory...") + os.mkdir(config_path) + print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml") + break + elif save_config == "n": + print("Will not be saving config") + print("Invalid input. Please enter 'y' or 'n'.") + return config, robot diff --git a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py index 98da17b..5d64fdf 100644 --- a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py +++ b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py @@ -1,4 +1,4 @@ -from typing import Any +#ruff: noqa: F403, F405 import rclpy from rclpy.node import Node from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint @@ -6,35 +6,19 @@ from sensor_msgs.msg import JointState from rcl_interfaces.msg import Log from osc4py3.as_allthreads import * from osc4py3 import oscmethod as osm -import xml.etree.ElementTree as ET import numpy as np import spatialmath as sm import roboticstoolbox as rtb from osc4py3 import oscbuildparse import time -import os import re import socket -class JointNameListener(Node): - joint_names: list[str]|None - def __init__(self): - super().__init__('joint_name_listener') - self.subscription = self.create_subscription( - JointState, - '/joint_states', - self.joint_state_callback, - 1 - ) - self.joint_names = None - - def joint_state_callback(self, msg: JointState): - self.joint_names = list(msg.name) class OSC_ROS2_interface(Node): """Node to publish joint trajectories based on OSC messages.""" - def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask): + def __init__(self, robot, config): super().__init__('scaled_joint_trajectory_publisher') @@ -203,7 +187,7 @@ class OSC_ROS2_interface(Node): points = [[float(j) for j in i] for i in args] elif len(args[0]) >= 7: points = [[float(j) for j in i[:6]] for i in args] - self.get_logger().warn(f"joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.") + self.get_logger().warn("joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.") else: self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.") return @@ -294,9 +278,9 @@ class OSC_ROS2_interface(Node): points = [[float(j) for j in i] for i in args] elif len(args[0]) >= 7: points = [[float(j) for j in i[:6]] for i in args] - self.get_logger().warn(f"cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.") + self.get_logger().warn("cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.") else: - self.get_logger().warn(f"cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.") + self.get_logger().warn("cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.") return self.desired = ["cartesian_trajectory"] + points @@ -397,9 +381,10 @@ class OSC_ROS2_interface(Node): def joint_states_callback(self, msg: JointState): """Callback function to handle incoming joint states.""" try: - msg_time = oscbuildparse.OSCMessage(f"/time", ',s', [str(time.time())]) + msg_time = oscbuildparse.OSCMessage("/time", ',s', [str(time.time())]) osc_send(msg_time, "osc_client") - if not(self.joint_names): self.joint_names = msg.name + if not(self.joint_names): + self.joint_names = msg.name joint_position_dict = dict(zip(msg.name, msg.position)) self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names] joint_position_dict = dict(zip(msg.name, msg.velocity)) @@ -409,7 +394,7 @@ class OSC_ROS2_interface(Node): tcp_position = self.robot.fkine(self.current_joint_positions).t tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy() - msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]]) + msg_tcp = oscbuildparse.OSCMessage("/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]]) #msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]]) #msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]]) #msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]]) @@ -420,10 +405,10 @@ class OSC_ROS2_interface(Node): #osc_send(bun, "osc_client") osc_send(msg_tcp, "osc_client") - msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position]) - msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity]) - msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort]) - msg_name = oscbuildparse.OSCMessage(f"/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name]) + msg_position = oscbuildparse.OSCMessage("/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position]) + msg_velocity = oscbuildparse.OSCMessage("/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity]) + msg_effort = oscbuildparse.OSCMessage("/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort]) + msg_name = oscbuildparse.OSCMessage("/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name]) bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort]) osc_send(bun, "osc_client") @@ -500,7 +485,7 @@ class OSC_ROS2_interface(Node): msg = JointTrajectory() msg.joint_names = self.joint_names steps_per_m = 100 - if self.previous_desired == None: + if self.previous_desired is None: [x,y,z] = self.robot.fkine(self.current_joint_positions).t [roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy() else: @@ -509,7 +494,8 @@ class OSC_ROS2_interface(Node): x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7] self.previous_desired = self.desired steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m) - if steps < 2: steps = 2 + if steps < 2: + steps = 2 cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)] '''if self.previous_desired: [x,y,z] = self.previous_desired[1:4] @@ -561,7 +547,7 @@ class OSC_ROS2_interface(Node): sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') if sol[1] == 1: fowards = self.robot.fkine_all(sol[0]) - out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) + out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False) if np.any(out_of_bounds): #print(fowards.t) #indices = np.where(out_of_bounds)[0] @@ -613,7 +599,7 @@ class OSC_ROS2_interface(Node): sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') if sol[1] == 1: fowards = self.robot.fkine_all(sol[0]) - out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) + out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False) if np.any(out_of_bounds): #print(fowards.t) #indices = np.where(out_of_bounds)[0] @@ -704,7 +690,8 @@ class OSC_ROS2_interface(Node): point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9) msg.points.append(point) prev_sol = list(sol[0]) - else: self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!") + else: + self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!") msg.header.stamp = self.get_clock().now().to_msg() msg.points = msg.points[::n] self.publisher.publish(msg) @@ -772,328 +759,7 @@ class OSC_ROS2_interface(Node): msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)]) osc_send(msg_log, "osc_log_client") -def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: - config = {} - robot = None - while True: - config["use_urdf"] = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() - if config["use_urdf"] == 'y': - while True: - config["robot_urdf"] = input("Enter the absolute path to the URDF file: ") - if os.path.isfile(config["robot_urdf"]): - if not config["robot_urdf"].endswith('.urdf'): - print("The file is not a URDF file. Please enter a valid URDF file.") - continue - break - else: - print("Invalid path. Please enter a valid path to the URDF file.") - tree = ET.parse(config["robot_urdf"]) - root = tree.getroot() - robot = rtb.ERobot.URDF(robot_urdf) - config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] - print(robot) - config["joint_velocity_limits"] = {} - # Iterate over all joints in the URDF - for joint in root.findall('.//joint'): - joint_name = joint.get('name') # Get the name of the joint - - # Look for the tag under each joint - limit = joint.find('limit') - - if limit is not None: - # Extract the velocity limit (if it exists) - velocity_limit = limit.get('velocity') - - if velocity_limit is not None: - config["joint_velocity_limits"][joint_name] = float(velocity_limit) - - while True: - try: - print('-+'*50) - print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") - print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") - config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") - cost_mask = [int(i) for i in config["cost_mask_string"]] - if sum(cost_mask) <= robot.n and len(cost_mask) == 6: - break - else: - print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.") - except ValueError: - print("Invalid input. Please enter integers only.") - print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}") - break - elif use_urdf == 'n': - node = JointNameListener() - print("Wainting 10 sec for JointState messages to extract joint names...") - rclpy.spin_once(node) - counter = 0 - while not(node.joint_names): - if counter > 100: - joint_names = None - break - counter+=1 - time.sleep(0.1) - rclpy.spin_once(node) - config["joint_names"] = node.joint_names - node.destroy_node() - if config["joint_names"]: - correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() - while True: - if correct_names == 'y': - break - elif correct_names == 'n': - config["joint_names"] = None - break - correct_names = input("Invalid input. Please enter 'y' or 'n'.") - if not(config["joint_names"]): - print("Please enter the joint names manually.") - while True: - config["joint_names"] = [] - print('-+'*50) - print("Enter the joint names manually one by one. Type 'done' when you are finished:") - print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") - while True: - print('-'*50) - joint_name = input("Enter joint name (or 'done' to finish): ").strip() - if joint_name.lower() == 'done': - break - if joint_name: - config["joint_names"].append(joint_name) - print('-+'*50) - correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config["joint_names"]}. (y/n)?: ").strip() - if correct_names.lower() == 'y': - break - else: - print("Please re-enter the joint names.") - while True: - try: - config["joint_velocity_limits"] = {} - for name in config["joint_names"]: - while True: - try: - print('--'*50) - limit = input(f"Enter the velocity limit for joint '{name}': ").strip() - if limit == "": - continue - else: - config["joint_velocity_limits"][name] = float(limit) - break - except ValueError: - print("Invalid input. Please enter a numeric value or press Enter to skip.") - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to skip.") - robot = None - cost_mask = None - break - print("Invalid input. Please enter 'y' or 'n'.") - if robot: - while True: - print('+-' * 50) - set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \ - (config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \ - (config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {config["x_limits"]}") - print(f"y: {config["y_limits"]}") - print(f"z: {config["z_limits"]}") - con = True - while con: - confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - config["x_limits"] = [None, None] - config["y_limits"] = [None, None] - config["z_limits"] = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - - while True: - print('+-' * 50) - set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \ - (config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \ - (config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {config["x_limits_workspace"]}") - print(f"y: {config["y_limits_workspace"]}") - print(f"z: {config["z_limits_workspace"]}") - con = True - while con: - confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - config["x_limits_workspace"] = [None, None] - config["y_limits_workspace"] = [None, None] - config["z_limits_workspace"] = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - # Ask the user if they want to set new joint limits - while True: - print('+-'*50) - update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() - if update_limits == 'y': - for i in range(len(config["joint_names"])): - while True: - try: - lim = robot.qlim.copy() - # Find the link corresponding to the joint name - print("-" * 50) - print(f"Current position limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") - lower_limit = input(f"Enter the new lower limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print("Invalid input. Lower limit must be less than upper limit.") - continue - lower_limit = float(lower_limit) if lower_limit!="" else None - upper_limit = float(upper_limit) if upper_limit!="" else None - if lower_limit!=None: - if lower_limitlim[1][i]: - while True: - sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() - if sure == 'y': - lim[1][i] = float(upper_limit) - break - elif sure == 'n': - print("Upper limit not changed.") - break - print("Invalid input. Please enter 'y' or 'n'.") - else: lim[1][i] = float(upper_limit) - robot.qlim = lim - print(f"New limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") - print("-" * 50) - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - break - if update_limits == 'n': - break - print("Invalid input. Please enter 'y' or 'n'.") - ''' - use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() - if use_link_mask == 'y': - while True: - try: - ''' - else: - while True: - print('+-' * 50) - update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() - n_joints = len(config["joint_names"]) - if update_limits == 'y': - config["joint_lim"] = [[]*n_joints,[]*n_joints] - for i, joint in enumerate(config["joint_names"]): - while True: - try: - print("-" * 50) - lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print('--' * 50) - print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") - continue - config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None - config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}') - break - elif update_limits == 'n': - config["joint_lim"] = None - break - print("Invalid input. Please enter 'y' or 'n'.") - while True: - try: - print('+-' * 50) - config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() - if config["trajectory_topic_name"] == "": - config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' - break - elif config["trajectory_topic_name"].startswith("/"): - break - else: - print("Invalid topic name. A valid topic name should start with '/'.") - except Exception as e: - print(f"An error occurred: {e}") - while True: - save_config = input("Do you want to save this interface? (y/n): ").strip().lower() - if save_config == "y": - config_path = input("Where do you want to save the config?: ").strip() - if not os.path.isdir(config_path): - print("Warning: Path does not exist") - print("Making Directory...") - os.mkdir(config_path) - print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml") - break - elif save_config == "n": - print("Will not be saving config") - print("Invalid input. Please enter 'y' or 'n'.") - return config, robot def main(): @@ -1101,7 +767,7 @@ def main(): rclpy.init() # TODO: if no config use interactive config, robot = interactive_input() - node = OSC_ROS2_interface(config["joint_names"], config["joint_velocity_limits"], robot, config["cost_mask"]) + node = OSC_ROS2_interface(robot, config) # Run ROS 2 spin, and osc_process will be handled by the timer try: