AS: joint and cartesian limits

This commit is contained in:
Alexander Schaefer 2025-04-30 09:44:12 +02:00
parent 33f21d3096
commit d4240bc4de
12 changed files with 241 additions and 50 deletions

View File

@ -6,7 +6,7 @@ def handler(*args):
osc_startup()
osc_udp_server("0.0.0.0", 8000, "osc_server")
osc_method("/coordinates", handler, argscheme=osm.OSCARG_DATAUNPACK)
osc_method("/joint_angles", handler, argscheme=osm.OSCARG_MESSAGE)
while True:

View File

@ -56,8 +56,8 @@ class JointStateLogger(Node):
writer.writerow(["timestamp", "x", "y", "z", "roll", "pitch", "yaw"])
with open(file_path_cart, mode='a', newline='') as f:
writer = csv.writer(f)
[x,y,z] = self.robot.fkine(list([msg.position[-1]])+list(msg.position[:4])).t
[roll, pitch, yaw] = self.robot.fkine(list([msg.position[-1]])+list(msg.position[:4])).rpy()
[x,y,z] = self.robot.fkine(list([msg.position[-1]])+list(msg.position[:5])).t
[roll, pitch, yaw] = self.robot.fkine(list([msg.position[-1]])+list(msg.position[:5])).rpy()
writer.writerow([timestamp, x, y, z, roll, pitch, yaw])

View File

@ -59,9 +59,7 @@ class JointTrajectoryLogger(Node):
for point in msg.points:
timestamp = recive_time + point.time_from_start.sec + point.time_from_start.nanosec * 1e-9
T = self.robot.fkine(point.positions)
x = T.t[0]
y = T.t[1]
z = T.t[2]
[x,y,z] = T.t
roll, pitch, yaw = T.rpy()
row = [timestamp, x, y, z, roll, pitch, yaw]
writer.writerow(row)

View File

@ -10,15 +10,14 @@ def main():
# Make client channels to send packets
osc_udp_client("172.18.0.3", 8000, "osc_client")
robot = rtb.Robot.URDF("/BA/ur10e.urdf")
# Example joint positions to send
joint_positions1 = [-0.4,0.6, 0.4, 0.0, 0.0, 0.0]
joint_positions2 = [-0.4,-0.6, 0.4, 0.0,0.0, 0.0]
joint_positions3 = [-0.4,-0.6, 0.6, 0.0, 0.0, 0.0]
joint_positions4 = [-0.4,-0.6, 0.2, 0.0, 0.0, 0.0]
joint_positions5 = [-0.4,0.6, 0.2, 0.0, 0.0, 0.0]
joint_positions1 = [0.0,0.0, 0.0, 0.0, 0.0, 0.0]
joint_positions2 = [-0.4,-1, 0.4, 0.0,0.0, 0.0]
joint_positions3 = [-0.4,-1, 0.2, 0.0, 0.0, 0.0]#, 6.0]
joint_positions4 = [-0.4,1, 0.2, 0.0, 0.0, 0.0]#, 6.0]
joint_positions5 = [-0.4,1, 0.4, 0.0, 0.0, 0.0]#, 6.0]
msg = oscbuildparse.OSCMessage("/joint_trajectory", None, [joint_positions1, joint_positions2])#, joint_positions3, joint_positions4, joint_positions5])
msg = oscbuildparse.OSCMessage("/joint_angles", None, [joint_positions1])#, joint_positions3])#, joint_positions4, joint_positions5])
osc_send(msg, "osc_client")
osc_process()
print("Sending joint positions")

View File

@ -7,6 +7,7 @@ from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
@ -44,15 +45,87 @@ class ScaledJointTrajectoryPublisher(Node):
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for joint_name in self.joint_names:
while True:
try:
print(f"Current position limits for joint '{joint_name}': {self.robot.links[joint_name]} rad")
lower_limit = input(f"Enter the new lower limit for joint '{joint_name}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint_name}' (or press Enter to keep current): ").strip()
if lower_limit is not None and upper_limit is not None and lower_limit >= upper_limit:
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
self.robot.links[joint_name][0] = float(lower_limit)
if upper_limit:
self.robot.links[joint_name][1] = float(upper_limit)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print(f"Received joint angles: {args}")
self.desired_joint_positions = [float(i) for i in list(args)]
# Ensure the desired joint positions are within the specified limits
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
@ -89,8 +162,16 @@ class ScaledJointTrajectoryPublisher(Node):
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']

View File

@ -47,6 +47,68 @@ class ScaledJointTrajectoryPublisher(Node):
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(self.joint_names)):
while True:
try:
lim = self.robot.qlim.copy()
# Find the link corresponding to the joint name
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
lim[0][i] = float(lower_limit)
if upper_limit:
lim[1][i] = float(upper_limit)
self.robot.qlim = lim
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
@ -54,15 +116,37 @@ class ScaledJointTrajectoryPublisher(Node):
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
self.desired_joint_positions = [float(i) for i in list(args)]
# Ensure the desired joint positions are within the specified limits
print("received joint angles")
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_position_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
@ -84,8 +168,24 @@ class ScaledJointTrajectoryPublisher(Node):
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True) if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False)
if np.any(out_of_bounds):
#print(fowards.t)
#indices = np.where(out_of_bounds)[0]
#print(f"indices: {indices}")
self.get_logger().warn("One or more links moved out of bounds!")
'''
for i in indices:
try:
print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}")
except IndexError:
print(f"index {i} is out of bounds, but no corresponding joint found.")
self.previous_desired = self.desired_joint_positions
'''
break
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
@ -102,8 +202,10 @@ class ScaledJointTrajectoryPublisher(Node):
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
else:
print('IK could not find a solution!')
print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!')
prev_sol = self.current_joint_positions
if len(msg.points) == 0:
return
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions

View File

@ -63,7 +63,7 @@ class ScaledJointTrajectoryPublisher(Node):
viapoints = []
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 1
steps_per_m = 4
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
@ -79,7 +79,7 @@ class ScaledJointTrajectoryPublisher(Node):
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
dt = 0.01
tacc = 0.1
tacc = 0.5
print(f'length viapoints: {len(viapoints)}')
traj = rtb.mstraj(np.array(viapoints), q0 = self.current_joint_positions ,dt=dt, tacc=tacc, qdmax=[1 * i for i in self.joint_velocity_limits])
print(len(traj.q))
@ -91,7 +91,7 @@ class ScaledJointTrajectoryPublisher(Node):
point = JointTrajectoryPoint()
point.positions = list(traj.q[i])
point.time_from_start.sec = int(traj.t[i])
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
point.time_from_start.nanosec = int(((traj.t[i] - int(traj.t[i])) * 1e9))
#point.time_from_start = rclpy.duration.Duration(seconds=traj.t[i]).to_msg()
msg.points.append(point)
msg.header.stamp = self.get_clock().now().to_msg()

View File

@ -22,6 +22,31 @@ class ScaledJointTrajectoryPublisher(Node):
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
print(f"Using topic name: {self.trajectroy_topic_name}")
print("--------------------------------------------------------------------------------------------------------------------------------")
while True:
try:
self.speed = input("Enter your desired speed of the tcp (in m/s): ")
if self.speed == '':
self.speed = 1
else:
self.speed = float(self.speed)
break
except ValueError:
print("Invalid input. Please enter a number.")
continue
while True:
try:
self.t_acc = input("Enter how fast you want the tcp to reach that velocity (in s). \nRemember! If the acceleration time is to short the robot might not be able to accelerate fast enough: ")
if self.t_acc == '':
self.t_acc = 2
else:
self.t_acc = float(self.t_acc)
break
except ValueError:
print("Invalid input. Please enter a number.")
continue
# ROS2 Publisher
self.publisher = self.create_publisher(
@ -60,44 +85,28 @@ class ScaledJointTrajectoryPublisher(Node):
try:
print("Received joint positions")
viapoints = np.array([list(i) for i in args])
print(1)
msg = JointTrajectory()
print(2)
msg.joint_names = self.joint_names
print(3)
x,y,z = self.robot.fkine(self.current_joint_positions).t
r,p,y = self.robot.fkine(self.current_joint_positions).rpy()
q0 = [x, y, z, r, p, y]
print(4)
traj = rtb.mstraj(viapoints, q0 = q0 ,dt=0.01, tacc=1, qdmax=[0.1]*len(self.joint_names))
print(traj.q)
print(5)
r,p,yaw = self.robot.fkine(self.current_joint_positions).rpy()
q0 = [x, y, z, r, p, yaw]
traj = rtb.mstraj(viapoints, q0 = q0 ,dt=0.01, tacc=self.t_acc, qdmax=self.speed)
msg.points = []
print(6)
prev_sol = q0
prev_sol = self.current_joint_positions
for i in range(len(traj.q)):
point = JointTrajectoryPoint()
print(8)
T = sm.SE3(traj.q[i][:3]) * sm.SE3.RPY(traj.q[i][3:])
T = sm.SE3(traj.q[i][:3]) * sm.SE3.RPY(traj.q[i][3:], order='xyz')
sol = self.robot.ik_LM(T, q0=prev_sol, mask = self.cost_mask, joint_limits = True)
print(9)
if sol[1] == 1:
print(10)
point = JointTrajectoryPoint()
point.positions = list(sol[0])
print(11)
point.time_from_start.sec = int(traj.t[i])
print(12)
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
print(13)
msg.points.append(point)
print(14)
prev_sol = list(sol[0])
print(15)
else: print('IK could not find a solution!')
print(16)
msg.header.stamp = self.get_clock().now().to_msg()
print(17)
self.publisher.publish(msg)
print(f'lenght msg.points: {len(msg.points)}')
print('published')
except Exception as e:
@ -114,6 +123,7 @@ def main():
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
print("--------------------------------------------------------------------------------------------------------------------------------")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
@ -151,6 +161,7 @@ def main():
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
print("--------------------------------------------------------------------------------------------------------------------------------")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together