[AN] (feat) Prepare for using configs

This commit is contained in:
Ali 2025-05-28 06:00:56 +02:00
parent a783bbf2a2
commit b4abe1eec1
No known key found for this signature in database
GPG Key ID: C9239381777823C2

View File

@ -1,3 +1,4 @@
from typing import Any
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
@ -16,6 +17,7 @@ import re
import socket
class JointNameListener(Node):
joint_names: list[str]|None
def __init__(self):
super().__init__('joint_name_listener')
self.subscription = self.create_subscription(
@ -69,202 +71,6 @@ class OSC_ROS2_interface(Node):
self.new = False
self.n_joints = len(joint_names)
if robot:
while True:
print('+-' * 50)
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
con = True
while con:
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
self.x_limits = [None, None]
self.y_limits = [None, None]
self.z_limits = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
print('+-' * 50)
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits_workspace) != 2 or len(self.y_limits_workspace) != 2 or len(self.z_limits_workspace) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits_workspace[0] is not None and self.x_limits_workspace[1] is not None and self.x_limits_workspace[0] >= self.x_limits_workspace[1]) or \
(self.y_limits_workspace[0] is not None and self.y_limits_workspace[1] is not None and self.y_limits_workspace[0] >= self.y_limits_workspace[1]) or \
(self.z_limits_workspace[0] is not None and self.z_limits_workspace[1] is not None and self.z_limits_workspace[0] >= self.z_limits_workspace[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits_workspace}")
print(f"y: {self.y_limits_workspace}")
print(f"z: {self.z_limits_workspace}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
self.x_limits_workspace = [None, None]
self.y_limits_workspace = [None, None]
self.z_limits_workspace = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
# Ask the user if they want to set new joint limits
# Ask the user if they want to set new joint limits
while True:
print('+-'*50)
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(self.joint_names)):
while True:
try:
lim = self.robot.qlim.copy()
# Find the link corresponding to the joint name
print("-" * 50)
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
lower_limit = float(lower_limit) if lower_limit!="" else None
upper_limit = float(upper_limit) if upper_limit!="" else None
if lower_limit!=None:
if lower_limit<lim[0][i]:
while True:
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[0][i] = float(lower_limit)
break
elif sure == 'n':
print("Lower limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[0][i] = float(lower_limit)
if upper_limit!=None:
if upper_limit>lim[1][i]:
while True:
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[1][i] = float(upper_limit)
break
elif sure == 'n':
print("Upper limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[1][i] = float(upper_limit)
self.robot.qlim = lim
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
break
if update_limits == 'n':
break
print("Invalid input. Please enter 'y' or 'n'.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
else:
while True:
print('+-' * 50)
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
self.joint_lim = [[None]*self.n_joints,[None]*self.n_joints]
for i, joint in enumerate(self.joint_names):
while True:
try:
print("-" * 50)
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print('--' * 50)
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
continue
self.joint_lim[0][i] = float(lower_limit) if lower_limit!="" else None
self.joint_lim[1][i] = float(upper_limit) if upper_limit!="" else None
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
print(f'New limits for joint:\n lower: {self.joint_lim[0]}\n upper: {self.joint_lim[1]}')
break
elif update_limits == 'n':
self.joint_lim = None
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
try:
print('+-' * 50)
self.trajectory_topic_name = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
if self.trajectory_topic_name == "":
self.trajectory_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
break
elif self.trajectory_topic_name.startswith("/"):
break
else:
print("Invalid topic name. A valid topic name should start with '/'.")
except Exception as e:
print(f"An error occurred: {e}")
# ROS2 Publisher
self.publisher = self.create_publisher(
@ -966,28 +772,27 @@ class OSC_ROS2_interface(Node):
msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)])
osc_send(msg_log, "osc_log_client")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
rclpy.init()
def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
config = {}
robot = None
while True:
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
if use_urdf == 'y':
config["use_urdf"] = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
if config["use_urdf"] == 'y':
while True:
robot_urdf = input("Enter the absolute path to the URDF file: ")
if os.path.isfile(robot_urdf):
if not robot_urdf.endswith('.urdf'):
config["robot_urdf"] = input("Enter the absolute path to the URDF file: ")
if os.path.isfile(config["robot_urdf"]):
if not config["robot_urdf"].endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(robot_urdf)
tree = ET.parse(config["robot_urdf"])
root = tree.getroot()
robot = rtb.ERobot.URDF(robot_urdf)
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
print(robot)
joint_velocity_limits = {}
config["joint_velocity_limits"] = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
@ -1001,14 +806,15 @@ def main():
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
config["joint_velocity_limits"][joint_name] = float(velocity_limit)
while True:
try:
print('-+'*50)
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
cost_mask = [int(i) for i in config["cost_mask_string"]]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
@ -1029,21 +835,21 @@ def main():
counter+=1
time.sleep(0.1)
rclpy.spin_once(node)
joint_names = node.joint_names
config["joint_names"] = node.joint_names
node.destroy_node()
if joint_names:
correct_names = input("The following joint names were found:\n" + "\n".join(joint_names) + "\nAre these correct? (y/n): ").strip().lower()
if config["joint_names"]:
correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
while True:
if correct_names == 'y':
break
elif correct_names == 'n':
joint_names = None
config["joint_names"] = None
break
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
if not(joint_names):
if not(config["joint_names"]):
print("Please enter the joint names manually.")
while True:
joint_names = []
config["joint_names"] = []
print('-+'*50)
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
@ -1053,17 +859,17 @@ def main():
if joint_name.lower() == 'done':
break
if joint_name:
joint_names.append(joint_name)
config["joint_names"].append(joint_name)
print('-+'*50)
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {joint_names}. (y/n)?: ").strip()
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config["joint_names"]}. (y/n)?: ").strip()
if correct_names.lower() == 'y':
break
else:
print("Please re-enter the joint names.")
while True:
try:
joint_velocity_limits = {}
for name in joint_names:
config["joint_velocity_limits"] = {}
for name in config["joint_names"]:
while True:
try:
print('--'*50)
@ -1071,7 +877,7 @@ def main():
if limit == "":
continue
else:
joint_velocity_limits[name] = float(limit)
config["joint_velocity_limits"][name] = float(limit)
break
except ValueError:
print("Invalid input. Please enter a numeric value or press Enter to skip.")
@ -1082,9 +888,220 @@ def main():
cost_mask = None
break
print("Invalid input. Please enter 'y' or 'n'.")
if robot:
while True:
print('+-' * 50)
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \
(config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \
(config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {config["x_limits"]}")
print(f"y: {config["y_limits"]}")
print(f"z: {config["z_limits"]}")
con = True
while con:
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits"] = [None, None]
config["y_limits"] = [None, None]
config["z_limits"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
print('+-' * 50)
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \
(config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \
(config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {config["x_limits_workspace"]}")
print(f"y: {config["y_limits_workspace"]}")
print(f"z: {config["z_limits_workspace"]}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits_workspace"] = [None, None]
config["y_limits_workspace"] = [None, None]
config["z_limits_workspace"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
# Ask the user if they want to set new joint limits
while True:
print('+-'*50)
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(config["joint_names"])):
while True:
try:
lim = robot.qlim.copy()
# Find the link corresponding to the joint name
print("-" * 50)
print(f"Current position limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
lower_limit = float(lower_limit) if lower_limit!="" else None
upper_limit = float(upper_limit) if upper_limit!="" else None
if lower_limit!=None:
if lower_limit<lim[0][i]:
while True:
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[0][i] = float(lower_limit)
break
elif sure == 'n':
print("Lower limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[0][i] = float(lower_limit)
if upper_limit!=None:
if upper_limit>lim[1][i]:
while True:
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[1][i] = float(upper_limit)
break
elif sure == 'n':
print("Upper limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[1][i] = float(upper_limit)
robot.qlim = lim
print(f"New limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
break
if update_limits == 'n':
break
print("Invalid input. Please enter 'y' or 'n'.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
else:
while True:
print('+-' * 50)
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
n_joints = len(config["joint_names"])
if update_limits == 'y':
config["joint_lim"] = [[]*n_joints,[]*n_joints]
for i, joint in enumerate(config["joint_names"]):
while True:
try:
print("-" * 50)
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print('--' * 50)
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
continue
config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}')
break
elif update_limits == 'n':
config["joint_lim"] = None
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
try:
print('+-' * 50)
config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
if config["trajectory_topic_name"] == "":
config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
break
elif config["trajectory_topic_name"].startswith("/"):
break
else:
print("Invalid topic name. A valid topic name should start with '/'.")
except Exception as e:
print(f"An error occurred: {e}")
while True:
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
if save_config == "y":
config_path = input("Where do you want to save the config?: ").strip()
if not os.path.isdir(config_path):
print("Warning: Path does not exist")
print("Making Directory...")
os.mkdir(config_path)
print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml")
break
elif save_config == "n":
print("Will not be saving config")
print("Invalid input. Please enter 'y' or 'n'.")
return config, robot
node = OSC_ROS2_interface(joint_names, joint_velocity_limits, robot, cost_mask)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
rclpy.init()
# TODO: if no config use interactive
config, robot = interactive_input()
node = OSC_ROS2_interface(config["joint_names"], config["joint_velocity_limits"], robot, config["cost_mask"])
# Run ROS 2 spin, and osc_process will be handled by the timer
try: