[AN] (feat) Moves the interactive input to another file

This commit is contained in:
Ali 2025-06-08 10:41:50 +02:00
parent b4abe1eec1
commit b45371d39b
No known key found for this signature in database
GPG Key ID: C9239381777823C2
3 changed files with 375 additions and 355 deletions

3
workspace/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build/**
install/**
log/**

View File

@ -0,0 +1,351 @@
import os
import time
import xml.etree.ElementTree as ET
from typing import Any
import rclpy
import roboticstoolbox as rtb
from rclpy import Node
from sensor_msgs.msg import JointState
class JointNameListener(Node):
joint_names: list[str]|None
def __init__(self):
super().__init__('joint_name_listener')
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_state_callback,
1
)
self.joint_names = None
def joint_state_callback(self, msg: JointState):
self.joint_names = list(msg.name)
def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
config = {}
robot = None
while True:
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
if use_urdf == 'y':
while True:
config["urdf"] = input("Enter the absolute path to the URDF file: ")
if os.path.isfile(config["urdf"]):
if not config["urdf"].endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(config["robot_urdf"])
root = tree.getroot()
robot = rtb.ERobot.URDF(config["urdf"])
config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
print(robot)
config["joint_velocity_limits"] = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
config["joint_velocity_limits"][joint_name] = float(velocity_limit)
while True:
try:
print('-+'*50)
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
cost_mask = [int(i) for i in config["cost_mask_string"]]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}")
break
elif use_urdf == 'n':
node = JointNameListener()
print("Wainting 10 sec for JointState messages to extract joint names...")
rclpy.spin_once(node)
counter = 0
while not(node.joint_names):
if counter > 100:
config["joint_names"] = None
break
counter+=1
time.sleep(0.1)
rclpy.spin_once(node)
config["joint_names"] = node.joint_names
node.destroy_node()
if config["joint_names"]:
correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
while True:
if correct_names == 'y':
break
elif correct_names == 'n':
config["joint_names"] = None
break
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
if not(config["joint_names"]):
print("Please enter the joint names manually.")
while True:
config["joint_names"] = []
print('-+'*50)
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
while True:
print('-'*50)
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
if joint_name.lower() == 'done':
break
if joint_name:
config["joint_names"].append(joint_name)
print('-+'*50)
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config['joint_names']}. (y/n)?: ").strip()
if correct_names.lower() == 'y':
break
else:
print("Please re-enter the joint names.")
while True:
try:
config["joint_velocity_limits"] = {}
for name in config["joint_names"]:
while True:
try:
print('--'*50)
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
if limit == "":
continue
else:
config["joint_velocity_limits"][name] = float(limit)
break
except ValueError:
print("Invalid input. Please enter a numeric value or press Enter to skip.")
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to skip.")
robot = None
cost_mask = None
break
print("Invalid input. Please enter 'y' or 'n'.")
if robot:
while True:
print('+-' * 50)
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \
(config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \
(config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print("Current limits:")
print(f"x: {config['x_limits']}")
print(f"y: {config['y_limits']}")
print(f"z: {config['z_limits']}")
con = True
while con:
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con:
break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits"] = [None, None]
config["y_limits"] = [None, None]
config["z_limits"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
print('+-' * 50)
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \
(config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \
(config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print("Current limits:")
print(f"x: {config['x_limits_workspace']}")
print(f"y: {config['y_limits_workspace']}")
print(f"z: {config['z_limits_workspace']}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con:
break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits_workspace"] = [None, None]
config["y_limits_workspace"] = [None, None]
config["z_limits_workspace"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
# Ask the user if they want to set new joint limits
while True:
print('+-'*50)
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(config["joint_names"])):
while True:
try:
lim = robot.qlim.copy()
# Find the link corresponding to the joint name
print("-" * 50)
print(f"Current position limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
lower_limit = float(lower_limit) if lower_limit!="" else None
upper_limit = float(upper_limit) if upper_limit!="" else None
if lower_limit is not None:
if lower_limit<lim[0][i]:
while True:
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[0][i] = float(lower_limit)
break
elif sure == 'n':
print("Lower limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else:
lim[0][i] = float(lower_limit)
if upper_limit is not None:
if upper_limit>lim[1][i]:
while True:
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[1][i] = float(upper_limit)
break
elif sure == 'n':
print("Upper limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else:
lim[1][i] = float(upper_limit)
robot.qlim = lim
print(f"New limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
break
if update_limits == 'n':
break
print("Invalid input. Please enter 'y' or 'n'.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
else:
while True:
print('+-' * 50)
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
n_joints = len(config["joint_names"])
if update_limits == 'y':
config["joint_lim"] = [[]*n_joints,[]*n_joints]
for i, joint in enumerate(config["joint_names"]):
while True:
try:
print("-" * 50)
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print('--' * 50)
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
continue
config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}')
break
elif update_limits == 'n':
config["joint_lim"] = None
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
try:
print('+-' * 50)
config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
if config["trajectory_topic_name"] == "":
config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
break
elif config["trajectory_topic_name"].startswith("/"):
break
else:
print("Invalid topic name. A valid topic name should start with '/'.")
except Exception as e:
print(f"An error occurred: {e}")
while True:
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
if save_config == "y":
config_path = input("Where do you want to save the config?: ").strip()
if not os.path.isdir(config_path):
print("Warning: Path does not exist")
print("Making Directory...")
os.mkdir(config_path)
print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml")
break
elif save_config == "n":
print("Will not be saving config")
print("Invalid input. Please enter 'y' or 'n'.")
return config, robot

View File

@ -1,4 +1,4 @@
from typing import Any #ruff: noqa: F403, F405
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
@ -6,35 +6,19 @@ from sensor_msgs.msg import JointState
from rcl_interfaces.msg import Log from rcl_interfaces.msg import Log
from osc4py3.as_allthreads import * from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import numpy as np import numpy as np
import spatialmath as sm import spatialmath as sm
import roboticstoolbox as rtb import roboticstoolbox as rtb
from osc4py3 import oscbuildparse from osc4py3 import oscbuildparse
import time import time
import os
import re import re
import socket import socket
class JointNameListener(Node):
joint_names: list[str]|None
def __init__(self):
super().__init__('joint_name_listener')
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_state_callback,
1
)
self.joint_names = None
def joint_state_callback(self, msg: JointState):
self.joint_names = list(msg.name)
class OSC_ROS2_interface(Node): class OSC_ROS2_interface(Node):
"""Node to publish joint trajectories based on OSC messages.""" """Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask): def __init__(self, robot, config):
super().__init__('scaled_joint_trajectory_publisher') super().__init__('scaled_joint_trajectory_publisher')
@ -203,7 +187,7 @@ class OSC_ROS2_interface(Node):
points = [[float(j) for j in i] for i in args] points = [[float(j) for j in i] for i in args]
elif len(args[0]) >= 7: elif len(args[0]) >= 7:
points = [[float(j) for j in i[:6]] for i in args] points = [[float(j) for j in i[:6]] for i in args]
self.get_logger().warn(f"joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.") self.get_logger().warn("joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.")
else: else:
self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.") self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.")
return return
@ -294,9 +278,9 @@ class OSC_ROS2_interface(Node):
points = [[float(j) for j in i] for i in args] points = [[float(j) for j in i] for i in args]
elif len(args[0]) >= 7: elif len(args[0]) >= 7:
points = [[float(j) for j in i[:6]] for i in args] points = [[float(j) for j in i[:6]] for i in args]
self.get_logger().warn(f"cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.") self.get_logger().warn("cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.")
else: else:
self.get_logger().warn(f"cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.") self.get_logger().warn("cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.")
return return
self.desired = ["cartesian_trajectory"] + points self.desired = ["cartesian_trajectory"] + points
@ -397,9 +381,10 @@ class OSC_ROS2_interface(Node):
def joint_states_callback(self, msg: JointState): def joint_states_callback(self, msg: JointState):
"""Callback function to handle incoming joint states.""" """Callback function to handle incoming joint states."""
try: try:
msg_time = oscbuildparse.OSCMessage(f"/time", ',s', [str(time.time())]) msg_time = oscbuildparse.OSCMessage("/time", ',s', [str(time.time())])
osc_send(msg_time, "osc_client") osc_send(msg_time, "osc_client")
if not(self.joint_names): self.joint_names = msg.name if not(self.joint_names):
self.joint_names = msg.name
joint_position_dict = dict(zip(msg.name, msg.position)) joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names] self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names]
joint_position_dict = dict(zip(msg.name, msg.velocity)) joint_position_dict = dict(zip(msg.name, msg.velocity))
@ -409,7 +394,7 @@ class OSC_ROS2_interface(Node):
tcp_position = self.robot.fkine(self.current_joint_positions).t tcp_position = self.robot.fkine(self.current_joint_positions).t
tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy() tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy()
msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]]) msg_tcp = oscbuildparse.OSCMessage("/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
#msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]]) #msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
#msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]]) #msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
#msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]]) #msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
@ -420,10 +405,10 @@ class OSC_ROS2_interface(Node):
#osc_send(bun, "osc_client") #osc_send(bun, "osc_client")
osc_send(msg_tcp, "osc_client") osc_send(msg_tcp, "osc_client")
msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position]) msg_position = oscbuildparse.OSCMessage("/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity]) msg_velocity = oscbuildparse.OSCMessage("/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort]) msg_effort = oscbuildparse.OSCMessage("/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort])
msg_name = oscbuildparse.OSCMessage(f"/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name]) msg_name = oscbuildparse.OSCMessage("/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name])
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort]) bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort])
osc_send(bun, "osc_client") osc_send(bun, "osc_client")
@ -500,7 +485,7 @@ class OSC_ROS2_interface(Node):
msg = JointTrajectory() msg = JointTrajectory()
msg.joint_names = self.joint_names msg.joint_names = self.joint_names
steps_per_m = 100 steps_per_m = 100
if self.previous_desired == None: if self.previous_desired is None:
[x,y,z] = self.robot.fkine(self.current_joint_positions).t [x,y,z] = self.robot.fkine(self.current_joint_positions).t
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy() [roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
else: else:
@ -509,7 +494,8 @@ class OSC_ROS2_interface(Node):
x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7] x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7]
self.previous_desired = self.desired self.previous_desired = self.desired
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m) steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
if steps < 2: steps = 2 if steps < 2:
steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)] cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
'''if self.previous_desired: '''if self.previous_desired:
[x,y,z] = self.previous_desired[1:4] [x,y,z] = self.previous_desired[1:4]
@ -561,7 +547,7 @@ class OSC_ROS2_interface(Node):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1: if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0]) fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
if np.any(out_of_bounds): if np.any(out_of_bounds):
#print(fowards.t) #print(fowards.t)
#indices = np.where(out_of_bounds)[0] #indices = np.where(out_of_bounds)[0]
@ -613,7 +599,7 @@ class OSC_ROS2_interface(Node):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1: if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0]) fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
if np.any(out_of_bounds): if np.any(out_of_bounds):
#print(fowards.t) #print(fowards.t)
#indices = np.where(out_of_bounds)[0] #indices = np.where(out_of_bounds)[0]
@ -704,7 +690,8 @@ class OSC_ROS2_interface(Node):
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9) point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
msg.points.append(point) msg.points.append(point)
prev_sol = list(sol[0]) prev_sol = list(sol[0])
else: self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!") else:
self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!")
msg.header.stamp = self.get_clock().now().to_msg() msg.header.stamp = self.get_clock().now().to_msg()
msg.points = msg.points[::n] msg.points = msg.points[::n]
self.publisher.publish(msg) self.publisher.publish(msg)
@ -772,328 +759,7 @@ class OSC_ROS2_interface(Node):
msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)]) msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)])
osc_send(msg_log, "osc_log_client") osc_send(msg_log, "osc_log_client")
def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
config = {}
robot = None
while True:
config["use_urdf"] = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
if config["use_urdf"] == 'y':
while True:
config["robot_urdf"] = input("Enter the absolute path to the URDF file: ")
if os.path.isfile(config["robot_urdf"]):
if not config["robot_urdf"].endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(config["robot_urdf"])
root = tree.getroot()
robot = rtb.ERobot.URDF(robot_urdf)
config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
print(robot)
config["joint_velocity_limits"] = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
config["joint_velocity_limits"][joint_name] = float(velocity_limit)
while True:
try:
print('-+'*50)
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
cost_mask = [int(i) for i in config["cost_mask_string"]]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}")
break
elif use_urdf == 'n':
node = JointNameListener()
print("Wainting 10 sec for JointState messages to extract joint names...")
rclpy.spin_once(node)
counter = 0
while not(node.joint_names):
if counter > 100:
joint_names = None
break
counter+=1
time.sleep(0.1)
rclpy.spin_once(node)
config["joint_names"] = node.joint_names
node.destroy_node()
if config["joint_names"]:
correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
while True:
if correct_names == 'y':
break
elif correct_names == 'n':
config["joint_names"] = None
break
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
if not(config["joint_names"]):
print("Please enter the joint names manually.")
while True:
config["joint_names"] = []
print('-+'*50)
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
while True:
print('-'*50)
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
if joint_name.lower() == 'done':
break
if joint_name:
config["joint_names"].append(joint_name)
print('-+'*50)
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config["joint_names"]}. (y/n)?: ").strip()
if correct_names.lower() == 'y':
break
else:
print("Please re-enter the joint names.")
while True:
try:
config["joint_velocity_limits"] = {}
for name in config["joint_names"]:
while True:
try:
print('--'*50)
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
if limit == "":
continue
else:
config["joint_velocity_limits"][name] = float(limit)
break
except ValueError:
print("Invalid input. Please enter a numeric value or press Enter to skip.")
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to skip.")
robot = None
cost_mask = None
break
print("Invalid input. Please enter 'y' or 'n'.")
if robot:
while True:
print('+-' * 50)
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \
(config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \
(config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {config["x_limits"]}")
print(f"y: {config["y_limits"]}")
print(f"z: {config["z_limits"]}")
con = True
while con:
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits"] = [None, None]
config["y_limits"] = [None, None]
config["z_limits"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
print('+-' * 50)
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \
(config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \
(config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {config["x_limits_workspace"]}")
print(f"y: {config["y_limits_workspace"]}")
print(f"z: {config["z_limits_workspace"]}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
break
elif set_limits == 'n':
config["x_limits_workspace"] = [None, None]
config["y_limits_workspace"] = [None, None]
config["z_limits_workspace"] = [None, None]
break
print("Invalid input. Please enter 'y' or 'n'.")
# Ask the user if they want to set new joint limits
while True:
print('+-'*50)
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(config["joint_names"])):
while True:
try:
lim = robot.qlim.copy()
# Find the link corresponding to the joint name
print("-" * 50)
print(f"Current position limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{config["joint_names"][i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
lower_limit = float(lower_limit) if lower_limit!="" else None
upper_limit = float(upper_limit) if upper_limit!="" else None
if lower_limit!=None:
if lower_limit<lim[0][i]:
while True:
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[0][i] = float(lower_limit)
break
elif sure == 'n':
print("Lower limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[0][i] = float(lower_limit)
if upper_limit!=None:
if upper_limit>lim[1][i]:
while True:
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
if sure == 'y':
lim[1][i] = float(upper_limit)
break
elif sure == 'n':
print("Upper limit not changed.")
break
print("Invalid input. Please enter 'y' or 'n'.")
else: lim[1][i] = float(upper_limit)
robot.qlim = lim
print(f"New limits for joint '{config["joint_names"][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
break
if update_limits == 'n':
break
print("Invalid input. Please enter 'y' or 'n'.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
else:
while True:
print('+-' * 50)
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
n_joints = len(config["joint_names"])
if update_limits == 'y':
config["joint_lim"] = [[]*n_joints,[]*n_joints]
for i, joint in enumerate(config["joint_names"]):
while True:
try:
print("-" * 50)
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print('--' * 50)
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
continue
config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}')
break
elif update_limits == 'n':
config["joint_lim"] = None
break
print("Invalid input. Please enter 'y' or 'n'.")
while True:
try:
print('+-' * 50)
config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
if config["trajectory_topic_name"] == "":
config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
break
elif config["trajectory_topic_name"].startswith("/"):
break
else:
print("Invalid topic name. A valid topic name should start with '/'.")
except Exception as e:
print(f"An error occurred: {e}")
while True:
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
if save_config == "y":
config_path = input("Where do you want to save the config?: ").strip()
if not os.path.isdir(config_path):
print("Warning: Path does not exist")
print("Making Directory...")
os.mkdir(config_path)
print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml")
break
elif save_config == "n":
print("Will not be saving config")
print("Invalid input. Please enter 'y' or 'n'.")
return config, robot
def main(): def main():
@ -1101,7 +767,7 @@ def main():
rclpy.init() rclpy.init()
# TODO: if no config use interactive # TODO: if no config use interactive
config, robot = interactive_input() config, robot = interactive_input()
node = OSC_ROS2_interface(config["joint_names"], config["joint_velocity_limits"], robot, config["cost_mask"]) node = OSC_ROS2_interface(robot, config)
# Run ROS 2 spin, and osc_process will be handled by the timer # Run ROS 2 spin, and osc_process will be handled by the timer
try: try: