Merge branch 'main' of https://collaborating.tuhh.de/m-4/ligeti/ros2osc/ba-alexanderschaefer
AS:
This commit is contained in:
commit
bccc4b4fdf
@ -15,13 +15,13 @@ def main():
|
||||
>>>>>>> c77d75859fda8ac862323fbd1993c35f7f931973
|
||||
|
||||
# Example joint positions to send
|
||||
joint_positions1 = [0.0,0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
joint_positions2 = [-0.4,-1, 0.4, 0.0,0.0, 0.0]
|
||||
joint_positions3 = [-0.4,-1, 0.2, 0.0, 0.0, 0.0]#, 6.0]
|
||||
joint_positions4 = [-0.4,1, 0.2, 0.0, 0.0, 0.0]#, 6.0]
|
||||
joint_positions5 = [-0.4,1, 0.4, 0.0, 0.0, 0.0]#, 6.0]
|
||||
joint_positions1 = [0.5,0.034, 0.839, 0.0, 0.0, 0.0]
|
||||
joint_positions2 = [0.4,-0.4, 0.4, 0.0,0.0, 0.0]
|
||||
joint_positions3 = [0.4,-0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0]
|
||||
joint_positions4 = [0.0,0.0, 0.6, 0.0, 0.0, 0.0]#, 6.0]
|
||||
joint_positions5 = [-0.4,0.4, 0.2, 0.0, 0.0, 0.0]#, 6.0]
|
||||
|
||||
msg = oscbuildparse.OSCMessage("/joint_angles", None, [joint_positions1])#, joint_positions3])#, joint_positions4, joint_positions5])
|
||||
msg = oscbuildparse.OSCMessage("/joint_trajectory", None, [joint_positions1, joint_positions2])#, joint_positions3, joint_positions4, joint_positions5])
|
||||
osc_send(msg, "osc_client")
|
||||
osc_process()
|
||||
print("Sending joint positions")
|
||||
|
@ -109,6 +109,12 @@ class ScaledJointTrajectoryPublisher(Node):
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
'''
|
||||
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
||||
if use_link_mask == 'y':
|
||||
while True:
|
||||
try:
|
||||
'''
|
||||
|
||||
|
||||
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
|
||||
@ -117,7 +123,8 @@ class ScaledJointTrajectoryPublisher(Node):
|
||||
|
||||
def joint_angles_handler(self, *args):
|
||||
# Ensure the desired joint positions are within the specified limits
|
||||
print("received joint angles")
|
||||
#print("received joint angles")
|
||||
|
||||
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
|
||||
if self.x_limits[0] is not None:
|
||||
x = max(self.x_limits[0], x)
|
||||
|
296
workspace/src/joint_control/joint_control/sandbox.py
Normal file
296
workspace/src/joint_control/joint_control/sandbox.py
Normal file
@ -0,0 +1,296 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||
from sensor_msgs.msg import JointState
|
||||
from osc4py3.as_allthreads import *
|
||||
from osc4py3 import oscmethod as osm
|
||||
import xml.etree.ElementTree as ET
|
||||
import numpy as np
|
||||
import spatialmath as sm
|
||||
import roboticstoolbox as rtb
|
||||
|
||||
class ScaledJointTrajectoryPublisher(Node):
|
||||
"""Node to publish joint trajectories based on OSC messages."""
|
||||
|
||||
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
|
||||
super().__init__('scaled_joint_trajectory_publisher')
|
||||
|
||||
# ROS2 Publisher
|
||||
self.publisher = self.create_publisher(
|
||||
JointTrajectory,
|
||||
'/scaled_joint_trajectory_controller/joint_trajectory',
|
||||
1
|
||||
)
|
||||
|
||||
self.subscription = self.create_subscription(
|
||||
JointState,
|
||||
'/joint_states',
|
||||
self.joint_states_callback,
|
||||
1 # Increased queue size for joint states
|
||||
)
|
||||
|
||||
# Store received joint positions
|
||||
self.current_joint_positions = [0.0] * len(joint_names)
|
||||
self.joint_names = joint_names
|
||||
self.joint_velocity_limits = joint_velocity_limits
|
||||
self.desired_joint_positions = [0.0] * len(joint_names)
|
||||
self.previous_desired = [0.0] * len(joint_names)
|
||||
self.robot = robot
|
||||
self.cost_mask = cost_mask
|
||||
self.prev_pose = None
|
||||
|
||||
ip = "0.0.0.0" # Listen on all network interfaces
|
||||
port = 8000 # Must match the sender's port in `joint_state_osc.py`
|
||||
|
||||
osc_startup()
|
||||
osc_udp_server(ip, port, "osc_server")
|
||||
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
|
||||
# Register OSC handler
|
||||
osc_method("/coordinates", self.coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
|
||||
def joint_angles_handler(self, *args):
|
||||
"""Handles incoming OSC messages for joint positions."""
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
n=2
|
||||
for arg in args:
|
||||
if len(arg) == len(self.joint_names):
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = list(arg)
|
||||
point.time_from_start.sec = n
|
||||
n+=2
|
||||
point.time_from_start.nanosec = 0
|
||||
msg.points.append(point)
|
||||
elif len(arg) == len(self.joint_names) + 1:
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = list(arg[:-1])
|
||||
point.time_from_start.sec = int(arg[-1])
|
||||
point.time_from_start.nanosec = int((arg[-1] - int(arg[-1])) * 1e9)
|
||||
msg.points.append(point)
|
||||
|
||||
self.publisher.publish(msg)
|
||||
print("published joint positions")
|
||||
|
||||
set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower()
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
|
||||
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
|
||||
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print(f"Current limits:")
|
||||
print(f"x: {self.x_limits}")
|
||||
print(f"y: {self.y_limits}")
|
||||
print(f"z: {self.z_limits}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
||||
if confirm == 'y':
|
||||
break
|
||||
elif confirm == 'n':
|
||||
print("Please re-enter the limits.")
|
||||
con = False
|
||||
else:
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if con: break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
|
||||
# Ask the user if they want to set new joint limits
|
||||
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
||||
if update_limits == 'y':
|
||||
for i in range(len(self.joint_names)):
|
||||
while True:
|
||||
try:
|
||||
lim = self.robot.qlim.copy()
|
||||
# Find the link corresponding to the joint name
|
||||
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print("Invalid input. Lower limit must be less than upper limit.")
|
||||
continue
|
||||
|
||||
if lower_limit:
|
||||
lim[0][i] = float(lower_limit)
|
||||
if upper_limit:
|
||||
lim[1][i] = float(upper_limit)
|
||||
self.robot.qlim = lim
|
||||
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
||||
print("-" * 50)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
'''
|
||||
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
||||
if use_link_mask == 'y':
|
||||
while True:
|
||||
try:
|
||||
'''
|
||||
|
||||
|
||||
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
|
||||
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
|
||||
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
|
||||
|
||||
def coordinates_handler(self, *args):
|
||||
# Ensure the desired joint positions are within the specified limits
|
||||
#print("received joint angles")
|
||||
|
||||
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
|
||||
if self.x_limits[0] is not None:
|
||||
x = max(self.x_limits[0], x)
|
||||
if self.x_limits[1] is not None:
|
||||
x = min(self.x_limits[1], x)
|
||||
if self.y_limits[0] is not None:
|
||||
y = max(self.y_limits[0], y)
|
||||
if self.y_limits[1] is not None:
|
||||
y = min(self.y_limits[1], y)
|
||||
if self.z_limits[0] is not None:
|
||||
z = max(self.z_limits[0], z)
|
||||
if self.z_limits[1] is not None:
|
||||
z = min(self.z_limits[1], z)
|
||||
|
||||
if x != args[0] or y != args[1] or z != args[2]:
|
||||
self.get_logger().warn(
|
||||
f"Desired joint positions adjusted to fit within limits: "
|
||||
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
|
||||
)
|
||||
|
||||
self.desired_joint_positions = [x, y, z, r, p, yaw]
|
||||
|
||||
|
||||
def joint_states_callback(self, msg):
|
||||
"""Callback function to handle incoming joint states."""
|
||||
joint_position_dict = dict(zip(msg.name, msg.position))
|
||||
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
|
||||
joint_position_dict = dict(zip(msg.name, msg.velocity))
|
||||
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
|
||||
|
||||
def update_position(self):
|
||||
if self.desired_joint_positions == self.previous_desired:
|
||||
return
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
steps_per_m = 100
|
||||
if True: #len(args[0]) == len(self.joint_names):
|
||||
prev_duration = 0
|
||||
if self.prev_pose == None:
|
||||
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
|
||||
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
|
||||
else:
|
||||
[x,y,z] = self.prev_pose[:3]
|
||||
[roll, pitch, yaw] = self.prev_pose[3:]
|
||||
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
|
||||
self.prev_pose = self.desired_joint_positions
|
||||
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
|
||||
if steps < 2: steps = 2
|
||||
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
|
||||
for j in range(steps):
|
||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||
if sol[1] == 1:
|
||||
fowards = self.robot.fkine_all(sol[0])
|
||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False)
|
||||
if np.any(out_of_bounds):
|
||||
#print(fowards.t)
|
||||
#indices = np.where(out_of_bounds)[0]
|
||||
#print(f"indices: {indices}")
|
||||
self.get_logger().warn("One or more links moved out of bounds!")
|
||||
'''
|
||||
for i in indices:
|
||||
try:
|
||||
print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}")
|
||||
except IndexError:
|
||||
print(f"index {i} is out of bounds, but no corresponding joint found.")
|
||||
self.previous_desired = self.desired_joint_positions
|
||||
'''
|
||||
break
|
||||
duration = 0
|
||||
prev = self.current_joint_positions if j == 0 else prev_sol
|
||||
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
|
||||
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
|
||||
prev_sol = list(sol[0])
|
||||
if duration == 0:
|
||||
continue
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = list(sol[0])
|
||||
duration *= 5
|
||||
duration += prev_duration
|
||||
prev_duration = duration
|
||||
point.time_from_start.sec = int(duration)
|
||||
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
|
||||
msg.points.append(point)
|
||||
else:
|
||||
print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!')
|
||||
prev_sol = self.current_joint_positions
|
||||
if len(msg.points) == 0:
|
||||
return
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
self.publisher.publish(msg)
|
||||
self.previous_desired = self.desired_joint_positions
|
||||
|
||||
def main():
|
||||
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
||||
robot_urdf = input("Enter the path to the URDF file: ")
|
||||
tree = ET.parse(robot_urdf)
|
||||
root = tree.getroot()
|
||||
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||
robot = rtb.ERobot.URDF(robot_urdf)
|
||||
joint_velocity_limits = {}
|
||||
|
||||
# Iterate over all joints in the URDF
|
||||
for joint in root.findall('.//joint'):
|
||||
joint_name = joint.get('name') # Get the name of the joint
|
||||
|
||||
# Look for the <limit> tag under each joint
|
||||
limit = joint.find('limit')
|
||||
|
||||
if limit is not None:
|
||||
# Extract the velocity limit (if it exists)
|
||||
velocity_limit = limit.get('velocity')
|
||||
|
||||
if velocity_limit is not None:
|
||||
joint_velocity_limits[joint_name] = float(velocity_limit)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
|
||||
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
|
||||
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
|
||||
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
|
||||
break
|
||||
else:
|
||||
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter integers only.")
|
||||
print(f"Cost mask: {cost_mask}")
|
||||
|
||||
rclpy.init()
|
||||
|
||||
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
|
||||
|
||||
# Run ROS 2 spin, and osc_process will be handled by the timer
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
osc_terminate()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -36,6 +36,7 @@ setup(
|
||||
'plugdata_cart_fix = joint_control.plugdata_cart_fix:main',
|
||||
'plugdata_cart_smooth = joint_control.plugdata_cart_smooth:main',
|
||||
'test=joint_control.test:main',
|
||||
'sandbox=sandbox.sandbox:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user