AS: adding sandbox example

This commit is contained in:
Alexander Schaefer 2025-04-30 14:02:31 +02:00
parent c77d75859f
commit f3f3113e65
4 changed files with 312 additions and 8 deletions

View File

@ -11,13 +11,13 @@ def main():
osc_udp_client("192.168.1.24", 8000, "osc_client") osc_udp_client("192.168.1.24", 8000, "osc_client")
# Example joint positions to send # Example joint positions to send
joint_positions1 = [0.0,0.0, 0.0, 0.0, 0.0, 0.0] joint_positions1 = [0.5,0.034, 0.839, 0.0, 0.0, 0.0]
joint_positions2 = [-0.4,-1, 0.4, 0.0,0.0, 0.0] joint_positions2 = [0.4,-0.4, 0.4, 0.0,0.0, 0.0]
joint_positions3 = [-0.4,-1, 0.2, 0.0, 0.0, 0.0]#, 6.0] joint_positions3 = [0.4,-0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0]
joint_positions4 = [-0.4,1, 0.2, 0.0, 0.0, 0.0]#, 6.0] joint_positions4 = [0.0,0.0, 0.6, 0.0, 0.0, 0.0]#, 6.0]
joint_positions5 = [-0.4,1, 0.4, 0.0, 0.0, 0.0]#, 6.0] joint_positions5 = [-0.4,0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0]
msg = oscbuildparse.OSCMessage("/joint_angles", None, [joint_positions1])#, joint_positions3])#, joint_positions4, joint_positions5]) msg = oscbuildparse.OSCMessage("/joint_trajectory", None, [joint_positions1, joint_positions2])#, joint_positions3, joint_positions4, joint_positions5])
osc_send(msg, "osc_client") osc_send(msg, "osc_client")
osc_process() osc_process()
print("Sending joint positions") print("Sending joint positions")

View File

@ -109,6 +109,12 @@ class ScaledJointTrajectoryPublisher(Node):
break break
except ValueError: except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.") print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
self.hz = float(input("Enter the desired refresh frequency (Hz): ")) self.hz = float(input("Enter the desired refresh frequency (Hz): "))
@ -117,7 +123,8 @@ class ScaledJointTrajectoryPublisher(Node):
def joint_angles_handler(self, *args): def joint_angles_handler(self, *args):
# Ensure the desired joint positions are within the specified limits # Ensure the desired joint positions are within the specified limits
print("received joint angles") #print("received joint angles")
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)] x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None: if self.x_limits[0] is not None:
x = max(self.x_limits[0], x) x = max(self.x_limits[0], x)
@ -195,7 +202,7 @@ class ScaledJointTrajectoryPublisher(Node):
continue continue
point = JointTrajectoryPoint() point = JointTrajectoryPoint()
point.positions = list(sol[0]) point.positions = list(sol[0])
duration *= 2 duration *= 5
duration += prev_duration duration += prev_duration
prev_duration = duration prev_duration = duration
point.time_from_start.sec = int(duration) point.time_from_start.sec = int(duration)

View File

@ -0,0 +1,296 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
self.prev_pose = None
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/coordinates", self.coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK)
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
n=2
for arg in args:
if len(arg) == len(self.joint_names):
point = JointTrajectoryPoint()
point.positions = list(arg)
point.time_from_start.sec = n
n+=2
point.time_from_start.nanosec = 0
msg.points.append(point)
elif len(arg) == len(self.joint_names) + 1:
point = JointTrajectoryPoint()
point.positions = list(arg[:-1])
point.time_from_start.sec = int(arg[-1])
point.time_from_start.nanosec = int((arg[-1] - int(arg[-1])) * 1e9)
msg.points.append(point)
self.publisher.publish(msg)
print("published joint positions")
set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(self.joint_names)):
while True:
try:
lim = self.robot.qlim.copy()
# Find the link corresponding to the joint name
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
lim[0][i] = float(lower_limit)
if upper_limit:
lim[1][i] = float(upper_limit)
self.robot.qlim = lim
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def coordinates_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
#print("received joint angles")
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_position_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 100
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
if self.prev_pose == None:
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
else:
[x,y,z] = self.prev_pose[:3]
[roll, pitch, yaw] = self.prev_pose[3:]
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
self.prev_pose = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False)
if np.any(out_of_bounds):
#print(fowards.t)
#indices = np.where(out_of_bounds)[0]
#print(f"indices: {indices}")
self.get_logger().warn("One or more links moved out of bounds!")
'''
for i in indices:
try:
print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}")
except IndexError:
print(f"index {i} is out of bounds, but no corresponding joint found.")
self.previous_desired = self.desired_joint_positions
'''
break
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
prev_sol = list(sol[0])
if duration == 0:
continue
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 5
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
else:
print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!')
prev_sol = self.current_joint_positions
if len(msg.points) == 0:
return
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@ -36,6 +36,7 @@ setup(
'plugdata_cart_fix = joint_control.plugdata_cart_fix:main', 'plugdata_cart_fix = joint_control.plugdata_cart_fix:main',
'plugdata_cart_smooth = joint_control.plugdata_cart_smooth:main', 'plugdata_cart_smooth = joint_control.plugdata_cart_smooth:main',
'test=joint_control.test:main', 'test=joint_control.test:main',
'sandbox=sandbox.sandbox:main',
], ],
}, },
) )