AS: cleaup

This commit is contained in:
Alexander Schaefer
2025-05-12 20:49:16 +02:00
parent 472cbc6b08
commit ab1ba7c1af
2165 changed files with 0 additions and 140555 deletions

View File

@@ -1 +0,0 @@
colcon

View File

@@ -1,108 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import roboticstoolbox as rtb
import spatialmath as sm
import xml.etree.ElementTree as ET
import time
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, robot, joint_names):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
10
)
# Store received joint positions
self.joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.robot = robot
osc_startup()
osc_udp_server("0.0.0.0", 8000, "osc_server")
print("Server started on 0.0.0.0:8000 \nready to receive messages in the following format: /tcp_coordinates [x, y, z, roll, pitch, yaw] optional: duration as last argument: /tcp_coordinates [x, y, z, roll, pitch, yaw, duration]")
# Register OSC handler
osc_method("/tcp_coordinates", self.tcp_coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK)
def tcp_coordinates_handler(self, *args):
"""Handles incoming OSC messages for tcp position."""
time1 = time.time()
if len(args) == len(self.joint_positions):
x, y, z, roll, pitch, yaw = args
duration = 4.0 # Default duration
elif len(args) == len(self.joint_positions) + 1:
x, y, z, roll, pitch, yaw, duration = args
else:
print("Invalid number of arguments")
return
# Create the desired end-effector pose
Tep = sm.SE3(x, y, z) * sm.SE3.RPY([roll, pitch, yaw], order='xyz')
# Compute the inverse kinematics to get the joint angles
#time1 = time.time()
#sol = self.robot.ikine_LM(Tep)
#print(f"Time taken for ERobot: {time.time() - time1}")
#time1 = time.time()
#sol = self.robot.ikine_LM(Tep, q0=self.joint_positions)
#print(f"Time taken for ERobot with initial guess: {time.time() - time1}")
#time1 = time.time()
#sol = self.robot.ets().ikine_LM(Tep)
#print(f"Time taken for ETS: {time.time() - time1}")
#time1 = time.time()
sol = self.robot.ik_LM(Tep, q0=self.joint_positions)
#print(f"Time taken for ETS with initial guess: {time.time() - time1}")
if sol[1]:
joint_positions = list(sol[0])
self.send_trajectory(joint_positions, duration)
#print(f"Computed joint positions: {joint_positions}")
else:
print("Inverse kinematics failed")
print(f"Frequency: {1/(time.time() - time1)} Hz")
def send_trajectory(self, joint_positions, duration=4.0):
"""Publish a joint trajectory command to move the robot."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
point = JointTrajectoryPoint()
point.positions = joint_positions # Updated joint positions
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
self.publisher.publish(msg)
print(f"Updated joint positions: {joint_positions}")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
tree = ET.parse('/BA/robot.urdf')
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF('/BA/robot.urdf')
rclpy.init()
node = ScaledJointTrajectoryPublisher(robot.ets(), joint_names)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,83 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
10
)
# Store received joint positions
self.joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
osc_startup()
osc_udp_server("0.0.0.0", 8000, "osc_server")
print("Server started on 0.0.0.0:8000 \nready to receive messages in the following format: \n /joint_angles [joint_positions]; optional: duration as last element, default is 3sec")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print(args)
if len(args) == len(self.joint_positions):
self.joint_positions = list(args)
print(self.joint_positions)
self.send_trajectory(self.joint_positions)
elif len(args) == len(self.joint_positions) + 1:
self.joint_positions = args[:-1]
self.send_trajectory(self.joint_positions, args[-1])
print(f'Duration: {args[-1]}')
def send_trajectory(self, joint_positions, duration=0.01):
"""Publish a joint trajectory command to move the robot."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
point = JointTrajectoryPoint()
joint_positions = [float(joint) for joint in joint_positions]
point.positions = joint_positions # Updated joint positions
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
self.publisher.publish(msg)
print(f"Updated joint positions: {self.joint_positions}")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,209 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for joint_name in self.joint_names:
while True:
try:
print(f"Current position limits for joint '{joint_name}': {self.robot.links[joint_name]} rad")
lower_limit = input(f"Enter the new lower limit for joint '{joint_name}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{joint_name}' (or press Enter to keep current): ").strip()
if lower_limit is not None and upper_limit is not None and lower_limit >= upper_limit:
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
self.robot.links[joint_name][0] = float(lower_limit)
if upper_limit:
self.robot.links[joint_name][1] = float(upper_limit)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_names = msg.name # List of joint names
joint_position_dict = dict(zip(joint_names, self.current_joint_positions))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
time1 = time.time()
if self.desired_joint_positions == self.previous_desired:
return
duration = 0
for p1, p2, max_vel in zip(self.desired_joint_positions, self.current_joint_positions, self.joint_velocity_limits.values()):
duration = max(max(duration, abs(p1 - p2) / max_vel),1/self.hz)# as minimun
duration *= 1.8
#print(f"Duration: {duration}")
#print(f'vel: {max(np.array(self.desired_joint_positions_joint_positions) - np.array(self.joint_positions)/duration)}')
msg = JointTrajectory()
msg.joint_names = self.joint_names
point = JointTrajectoryPoint()
point.positions = self.desired_joint_positions # Updated joint positions
point.time_from_start.sec = int(duration)
#point.time_from_start.sec = 10
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
#point.time_from_start.nanosec = 0
#point.velocities = [i*0.95 for i in self.joint_velocity_limits.values()]
msg.points.append(point)
self.publisher.publish(msg)
#print(f"desired: {self.desired_joint_positions}")
#print(f"desired: {[180/3.141*i for i in self.desired_joint_positions]}")
#print(f"current: {[180/3.141*i for i in self.current_joint_positions]}")
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,158 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
self.desired_joint_positions = [float(i) for i in list(args)]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 30
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
T1 = self.robot.fkine(self.current_joint_positions)
[x,y,z] = T1.t
[roll, pitch, yaw] = T1.rpy()
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
#duration *= 2
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else:
print('IK could not find a solution!')
prev_sol = self.current_joint_positions
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,264 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
self.prev_pose = None
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(self.joint_names)):
while True:
try:
lim = self.robot.qlim.copy()
# Find the link corresponding to the joint name
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
lim[0][i] = float(lower_limit)
if upper_limit:
lim[1][i] = float(upper_limit)
self.robot.qlim = lim
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
print("received joint angles")
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_position_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 100
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
if self.prev_pose == None:
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
else:
[x,y,z] = self.prev_pose[:3]
[roll, pitch, yaw] = self.prev_pose[3:]
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
self.prev_pose = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False)
if np.any(out_of_bounds):
#print(fowards.t)
#indices = np.where(out_of_bounds)[0]
#print(f"indices: {indices}")
self.get_logger().warn("One or more links moved out of bounds!")
'''
for i in indices:
try:
print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}")
except IndexError:
print(f"index {i} is out of bounds, but no corresponding joint found.")
self.previous_desired = self.desired_joint_positions
'''
break
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
prev_sol = list(sol[0])
if duration == 0:
continue
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 2
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
else:
print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!')
prev_sol = self.current_joint_positions
if len(msg.points) == 0:
return
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,232 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
except ValueError:
print("Invalid input. Please enter numeric values only.")
def joint_angles_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def rampfunction(self, startvalue, blendtime, currenttime):
"""
Ramp function to create a smooth transition from startvalue to 1 over blendtime seconds.
"""
if currenttime < blendtime:
return startvalue + (1 - startvalue) * (currenttime / blendtime)
else:
return 1
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 30
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
T1 = self.robot.fkine(self.current_joint_positions)
[x,y,z] = T1.t
[roll, pitch, yaw] = T1.rpy()
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration /= self.rampfunction(0.1, 2, prev_duration)
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else:
print(f'IK could not find a solution for (x,y,z) = ({cart_traj[j].t}), (roll,pitch,yaw) = ({cart_traj[j].rpy()})!')
prev_sol = self.current_joint_positions
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotation.")
while True:
try:
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
considered_coords = [coord for coord, use in zip(['x', 'y', 'z', 'roll', 'pitch', 'yaw'], cost_mask) if use == 1]
print(f"The following coordinates will be considered for IK: {', '.join(considered_coords)}")
confirm = input("Are you sure you want to proceed with this cost mask? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the cost mask.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,296 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
self.prev_pose = None
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/coordinates", self.coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK)
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
n=2
for arg in args:
if len(arg) == len(self.joint_names):
point = JointTrajectoryPoint()
point.positions = list(arg)
point.time_from_start.sec = n
n+=2
point.time_from_start.nanosec = 0
msg.points.append(point)
elif len(arg) == len(self.joint_names) + 1:
point = JointTrajectoryPoint()
point.positions = list(arg[:-1])
point.time_from_start.sec = int(arg[-1])
point.time_from_start.nanosec = int((arg[-1] - int(arg[-1])) * 1e9)
msg.points.append(point)
self.publisher.publish(msg)
print("published joint positions")
set_limits = input("Do you want to set limit for x, y and z? (y/n): ").strip().lower()
if set_limits == 'y':
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
con = True
while con:
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
con = False
else:
print("Invalid input. Please enter 'y' or 'n'.")
if con: break
except ValueError:
print("Invalid input. Please enter numeric values only.")
# Ask the user if they want to set new joint limits
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
if update_limits == 'y':
for i in range(len(self.joint_names)):
while True:
try:
lim = self.robot.qlim.copy()
# Find the link corresponding to the joint name
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
print("Invalid input. Lower limit must be less than upper limit.")
continue
if lower_limit:
lim[0][i] = float(lower_limit)
if upper_limit:
lim[1][i] = float(upper_limit)
self.robot.qlim = lim
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
print("-" * 50)
break
except ValueError:
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
'''
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
if use_link_mask == 'y':
while True:
try:
'''
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def coordinates_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
#print("received joint angles")
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_position_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 100
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
if self.prev_pose == None:
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
else:
[x,y,z] = self.prev_pose[:3]
[roll, pitch, yaw] = self.prev_pose[3:]
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
self.prev_pose = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
if sol[1] == 1:
fowards = self.robot.fkine_all(sol[0])
out_of_bounds = (fowards.t[1:,0] > self.x_limits[1] if self.x_limits[1] != None else False) | (fowards.t[1:,0] < self.x_limits[0] if self.x_limits[0] != None else False) | (fowards.t[1:,1] > self.y_limits[1] if self.y_limits[1] != None else False) | (fowards.t[1:,1] < self.y_limits[0] if self.y_limits[0] != None else False) | (fowards.t[1:,2] > self.z_limits[1] if self.z_limits[1] != None else False) | (fowards.t[1:,2] < self.z_limits[0] if self.z_limits[0] != None else False)
if np.any(out_of_bounds):
#print(fowards.t)
#indices = np.where(out_of_bounds)[0]
#print(f"indices: {indices}")
self.get_logger().warn("One or more links moved out of bounds!")
'''
for i in indices:
try:
print(f"Joint {self.robot.links[i].name} is out of bounds: (x,y,z) = {fowards.t[i]}")
except IndexError:
print(f"index {i} is out of bounds, but no corresponding joint found.")
self.previous_desired = self.desired_joint_positions
'''
break
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
prev_sol = list(sol[0])
if duration == 0:
continue
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 5
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
else:
print(f'IK could not find a solution for (x,y,z) = {cart_traj[j].t} and (r,p,y) = {cart_traj[j].rpy()}!')
prev_sol = self.current_joint_positions
if len(msg.points) == 0:
return
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,36 +0,0 @@
import numpy as np
from roboticstoolbox.tools.trajectory import mstraj
# Define via points (each row is a joint configuration)
viapoints = np.array([
[0, 0, 0], # Start
[0.5, 0.2, -0.1], # Intermediate
[1.0, 0.4, 0.2] # End
])
# Time step
dt = 0.01 # seconds
# Acceleration time
tacc = 0.2 # seconds
# Maximum joint velocity per joint (same length as number of joints)
qdmax = [0.5, 0.3, 0.4] # radians per second
# Optional: starting position (otherwise uses first viapoint)
q0 = viapoints[0]
# Generate the trajectory
traj = mstraj(
viapoints=viapoints,
dt=dt,
tacc=tacc,
qdmax=qdmax,
)
# Extract trajectory
time = traj.t # Time vector
positions = traj.q # Joint angles (shape: K x N)
print("Time vector:", time)
print("Joint positions:\n", positions)

View File

@@ -1,78 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
10
)
# Store received joint positions
self.joint_positions = []
self.joint_names = joint_names
osc_startup()
osc_udp_server("0.0.0.0", 8000, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
msg = JointTrajectory()
msg.joint_names = self.joint_names
n=2
for arg in args:
if len(arg) == len(self.joint_names):
point = JointTrajectoryPoint()
point.positions = list(arg)
point.time_from_start.sec = n
n+=2
point.time_from_start.nanosec = 0
msg.points.append(point)
elif len(arg) == len(self.joint_names) + 1:
point = JointTrajectoryPoint()
point.positions = list(arg[:-1])
point.time_from_start.sec = int(arg[-1])
point.time_from_start.nanosec = int((arg[-1] - int(arg[-1])) * 1e9)
msg.points.append(point)
self.publisher.publish(msg)
print("published joint positions")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
tree = ET.parse('/BA/robot.urdf')
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,149 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
time1 = time.time()
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
joint_positions = [0.0] * len(self.joint_names)
steps = 30
vel = 0.4
if True: #len(args[0]) == len(self.joint_names):
n=2.0
for i in range(len(args)-1):
print(f'i = {i}')
x, y, z, roll, pitch, yaw = args[i]
print(1)
Tep1 = sm.SE3(x, y, z) * sm.SE3.RPY([roll, pitch, yaw], order='xyz')
print(2)
x, y, z, roll, pitch, yaw = args[i+1]
print(3)
Tep2 = sm.SE3(x, y, z) * sm.SE3.RPY([roll, pitch, yaw], order='xyz')
print(4)
cart_traj = rtb.ctraj(Tep1, Tep2, steps)
print(cart_traj)
print(5)
for j in range(steps):
print(f'j = {j}')
print(6)
sol = self.robot.ik_LM(cart_traj[j], q0=joint_positions, mask = self.cost_mask, joint_limits = True)
print(7)
if sol[1] == 1:
print(8)
if j == 0: dist = vel*n
else: dist = np.linalg.norm(cart_traj[j].t - cart_traj[j-1].t)
print(9)
point = JointTrajectoryPoint()
print(10)
point.positions = list(sol[0])
print(11)
joint_positions = list(sol[0])
print(12)
point.time_from_start.sec = int(n)
print(13)
point.time_from_start.nanosec = int((n - int(n)) * 1e9)
print(14)
n+=dist/vel
print(16)
msg.points.append(point)
print(17)
else: print('IK could not find a solution!')
print(18)
self.publisher.publish(msg)
print(19)
print(f"published joint positions {msg.points[-1]}")
print(f'Frequency: {round(1/(time.time()-time1),2)} Hz')
'''
elif len(args[0]) == len(self.joint_names) + 1:
for i in range(len(args)):
x, y, z, roll, pitch, yaw, timetag = args[i]
Tep = sm.SE3(x, y, z) * sm.SE3.RPY([roll, pitch, yaw], order='xyz')
x, y, z, roll, pitch, yaw = args[i+1][:-1]
Tep2 = sm.SE3(x, y, z) * sm.SE3.RPY([roll, pitch, yaw], order='xyz')
cart_traj = rtb.ctraj(Tep, Tep2, steps)
for Tep in cart_traj:
sol = self.robot.ik_LM(Tep, q0=joint_positions)
else:
print("Invalid number or format of arguments")'''
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
path_to_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0) separated by spaces, of which <= {robot.n} are 1): ").split()]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,141 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = [0.0,0.0,0.0,0.0,0.0,0.0] if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for p1, p2, max_vel in zip(sol[0], prev_sol, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 1.6
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,166 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
self.maximum_acceleration = [0.0] * len(joint_names)
# Store received joint positions
self.joint_names = joint_names
for joint in joint_names:
self.maximum_acceleration[joint_names.index(joint)] = float(input(f"Enter the maximum acceleration for joint {joint}: "))
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = self.current_joint_positions if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for i, (p1, p2, max_vel) in enumerate(zip(sol[0], prev_sol, self.joint_velocity_limits.values())):
print(f'joint {i}, p1: {p1}, p2: {p2}, max_vel: {max_vel}')
if len(msg.points) == 0: v = self.current_joint_velocities[i]
max_acc_duration = np.sqrt((v/self.maximum_acceleration[i])**2 + 2*(abs(p1 - p2)/self.maximum_acceleration[i]))- v/self.maximum_acceleration[i]
duration = max(duration, abs(p1 - p2) / max_vel, max_acc_duration) # as minimun
v = abs(p1 - p2) / duration
print(f'duration: {duration}, max_acc_duration: {max_acc_duration}, max_vel_duration: { abs(p1 - p2) / max_vel}, v: {v}')
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,152 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def rampfunction(self, startvalue, blendtime, currenttime):
"""
Ramp function to create a smooth transition from startvalue to 1 over blendtime seconds.
"""
if currenttime < blendtime:
return startvalue + (1 - startvalue) * (currenttime / blendtime)
else:
return 1
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = [0.0,0.0,0.0,0.0,0.0,0.0] if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for p1, p2, max_vel in zip(sol[0], prev_sol, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration /= self.rampfunction(0.1, 2, prev_duration)
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,166 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = [joint_velocity_limits[joint] for joint in joint_names]
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
viapoints = []
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 4
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
if i == 0: prev_sol = self.current_joint_positions
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
viapoints.append(list(sol[0]))
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
dt = 0.01
tacc = 0.5
print(f'length viapoints: {len(viapoints)}')
traj = rtb.mstraj(np.array(viapoints), q0 = self.current_joint_positions ,dt=dt, tacc=tacc, qdmax=[1 * i for i in self.joint_velocity_limits])
print(len(traj.q))
print(len(traj.t))
print(traj.t)
print(traj.arrive)
msg.points = []
for i in range(len(traj.q)):
point = JointTrajectoryPoint()
point.positions = list(traj.q[i])
point.time_from_start.sec = int(traj.t[i])
point.time_from_start.nanosec = int(((traj.t[i] - int(traj.t[i])) * 1e9))
#point.time_from_start = rclpy.duration.Duration(seconds=traj.t[i]).to_msg()
msg.points.append(point)
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
print('published')
except Exception as e:
print(f'Error in joint angles handler: {e}')
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,180 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
print(f"Using topic name: {self.trajectroy_topic_name}")
print("--------------------------------------------------------------------------------------------------------------------------------")
while True:
try:
self.speed = input("Enter your desired speed of the tcp (in m/s): ")
if self.speed == '':
self.speed = 1
else:
self.speed = float(self.speed)
break
except ValueError:
print("Invalid input. Please enter a number.")
continue
while True:
try:
self.t_acc = input("Enter how fast you want the tcp to reach that velocity (in s). \nRemember! If the acceleration time is to short the robot might not be able to accelerate fast enough: ")
if self.t_acc == '':
self.t_acc = 2
else:
self.t_acc = float(self.t_acc)
break
except ValueError:
print("Invalid input. Please enter a number.")
continue
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
viapoints = np.array([list(i) for i in args])
msg = JointTrajectory()
msg.joint_names = self.joint_names
x,y,z = self.robot.fkine(self.current_joint_positions).t
r,p,yaw = self.robot.fkine(self.current_joint_positions).rpy()
q0 = [x, y, z, r, p, yaw]
traj = rtb.mstraj(viapoints, q0 = q0 ,dt=0.01, tacc=self.t_acc, qdmax=self.speed)
msg.points = []
prev_sol = self.current_joint_positions
for i in range(len(traj.q)):
T = sm.SE3(traj.q[i][:3]) * sm.SE3.RPY(traj.q[i][3:], order='xyz')
sol = self.robot.ik_LM(T, q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
point = JointTrajectoryPoint()
point.positions = list(sol[0])
point.time_from_start.sec = int(traj.t[i])
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
print(f'lenght msg.points: {len(msg.points)}')
print('published')
except Exception as e:
print(f'Error in joint_angles_handler: {e}')
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
print("--------------------------------------------------------------------------------------------------------------------------------")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
print("--------------------------------------------------------------------------------------------------------------------------------")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,185 +0,0 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = [joint_velocity_limits[joint] for joint in joint_names]
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
self.maximum_acceleration = [0.0] * len(joint_names)
# Store received joint positions
self.joint_names = joint_names
for joint in joint_names:
self.maximum_acceleration[joint_names.index(joint)] = float(input(f"Enter the maximum acceleration for joint {joint}: "))
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 50
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
steps = int(np.linalg.norm([x1-x, y1-y, z1-z])*steps_per_m)
if steps <= 1: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = self.current_joint_positions if i == 0 else sol[0]
sol_set = []
for j in (range(steps) if i == 0 else range(1,steps)):
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
sol_set.append(sol[0])
prev_sol = list(sol[0])
else: print(f'IK could not find a solution for (x,y,z) = ({cart_traj[j].t}), (roll,pitch,yaw) = ({cart_traj[j].rpy()})!')
distance = abs(sol_set[0]-sol_set[-1])
ts= distance/np.array(self.joint_velocity_limits)+2*np.array(self.joint_velocity_limits)/np.array(self.maximum_acceleration)
t = max(ts)
idx = list(ts).index(t)
s_acc = self.joint_velocity_limits[idx]**2/(2*self.maximum_acceleration[idx])
print(f"t: {t}, idx: {idx}, s_acc: {s_acc}")
print(f"sol_set: {sol_set}")
for sol in sol_set:
print(f"sol: {sol}")
s = abs(sol[idx]-sol_set[0][idx])
print(f"sol_set[0][idx]: {sol_set[0][idx]}, sol[idx]: {sol[idx]}, s: {s}")
if s <= s_acc:
duration = np.sqrt(s/self.maximum_acceleration[idx])
print(f"acceleration phase, duration: {duration}")
elif s <= sol_set[-1][idx]-s_acc:
duration = self.joint_velocity_limits[idx]/self.maximum_acceleration[idx] + (s-s_acc)/self.joint_velocity_limits[idx]
print(f"constant velocity phase, duration: {duration}")
else:
duration = t-np.sqrt((sol_set[-1][idx]-s)/self.maximum_acceleration[idx])
print(f"deceleration phase, duration: {duration}")
point = JointTrajectoryPoint()
point.positions = list(sol)
duration += prev_duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_duration = duration
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
except Exception as e:
print(f"Error in joint angles handler: {e}")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1 +0,0 @@
0

View File

@@ -1 +0,0 @@
# generated from colcon_core/shell/template/command_prefix.sh.em

View File

@@ -1,20 +0,0 @@
AMENT_PREFIX_PATH=/BA/workspace/install/painting_robot_control:/BA/workspace/install/osc_ros2:/BA/workspace/install/mock_robot:/BA/workspace/install/joint_info:/BA/workspace/install/joint_control:/opt/ros/humble
COLCON=1
COLCON_PREFIX_PATH=/BA/workspace/install
HOME=/root
HOSTNAME=0e38e264ac6b
LANG=C.UTF-8
LC_ALL=C.UTF-8
LD_LIBRARY_PATH=/opt/ros/humble/opt/rviz_ogre_vendor/lib:/opt/ros/humble/lib/x86_64-linux-gnu:/opt/ros/humble/lib
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:
OLDPWD=/BA/workspace/src
PATH=/opt/ros/humble/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
PWD=/BA/workspace/build/joint_control
PYTHONPATH=/BA/workspace/build/painting_robot_control:/BA/workspace/install/painting_robot_control/lib/python3.10/site-packages:/BA/workspace/build/osc_ros2:/BA/workspace/install/osc_ros2/lib/python3.10/site-packages:/BA/workspace/build/mock_robot:/BA/workspace/install/mock_robot/lib/python3.10/site-packages:/BA/workspace/build/joint_info:/BA/workspace/install/joint_info/lib/python3.10/site-packages:/BA/workspace/build/joint_control:/BA/workspace/install/joint_control/lib/python3.10/site-packages:/opt/ros/humble/lib/python3.10/site-packages:/opt/ros/humble/local/lib/python3.10/dist-packages
ROS_DISTRO=humble
ROS_LOCALHOST_ONLY=0
ROS_PYTHON_VERSION=3
ROS_VERSION=2
SHLVL=1
TERM=xterm
_=/usr/bin/colcon

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_control/joint_control

View File

@@ -1,12 +0,0 @@
Metadata-Version: 2.1
Name: joint-control
Version: 0.0.0
Summary: TODO: Package description
Home-page: UNKNOWN
Maintainer: root
Maintainer-email: root@todo.todo
License: TODO: License declaration
Platform: UNKNOWN
UNKNOWN

View File

@@ -1,35 +0,0 @@
package.xml
setup.cfg
setup.py
../../build/joint_control/joint_control.egg-info/PKG-INFO
../../build/joint_control/joint_control.egg-info/SOURCES.txt
../../build/joint_control/joint_control.egg-info/dependency_links.txt
../../build/joint_control/joint_control.egg-info/entry_points.txt
../../build/joint_control/joint_control.egg-info/requires.txt
../../build/joint_control/joint_control.egg-info/top_level.txt
../../build/joint_control/joint_control.egg-info/zip-safe
joint_control/__init__.py
joint_control/cart_tcp_server.py
joint_control/joint_angles_server.py
joint_control/plugdata.py
joint_control/plugdata_cart.py
joint_control/plugdata_cart_fix.py
joint_control/plugdata_cart_smooth.py
joint_control/sandbox.py
joint_control/test.py
joint_control/trajectory_server.py
joint_control/trajectory_server_cart.py
joint_control/trajectory_server_cart_fast.py
joint_control/trajectory_server_cart_fast_max_acc.py
joint_control/trajectory_server_cart_fast_smooth.py
joint_control/trajectory_server_new.py
joint_control/trajectory_server_new_cart.py
joint_control/trajectory_server_trapezoidal.py
joint_control.egg-info/PKG-INFO
joint_control.egg-info/SOURCES.txt
joint_control.egg-info/dependency_links.txt
joint_control.egg-info/entry_points.txt
joint_control.egg-info/requires.txt
joint_control.egg-info/top_level.txt
joint_control.egg-info/zip-safe
resource/joint_control

View File

@@ -1,18 +0,0 @@
[console_scripts]
cart_coords = joint_control.cart_tcp_server:main
joint_control = joint_control.joint_angles_server:main
plugdata = joint_control.plugdata:main
plugdata_cart = joint_control.plugdata_cart:main
plugdata_cart_fix = joint_control.plugdata_cart_fix:main
plugdata_cart_smooth = joint_control.plugdata_cart_smooth:main
sandbox = sandbox.sandbox:main
test = joint_control.test:main
trajectory_server = joint_control.trajectory_server:main
trajectory_server_cart = joint_control.trajectory_server_cart:main
trajectory_server_cart_fast = joint_control.trajectory_server_cart_fast:main
trajectory_server_cart_fast_max_acc = joint_control.trajectory_server_cart_fast_max_acc:main
trajectory_server_cart_fast_smooth = joint_control.trajectory_server_cart_fast_smooth:main
trajectory_server_new = joint_control.trajectory_server_new:main
trajectory_server_new_cart = joint_control.trajectory_server_new_cart:main
trajectory_server_trapezoidal = joint_control.trajectory_server_trapezoidal:main

View File

@@ -1 +0,0 @@
setuptoolsosc4py3

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_control/package.xml

View File

@@ -1,4 +0,0 @@
import sys
if sys.prefix == '/usr':
sys.real_prefix = sys.prefix
sys.prefix = sys.exec_prefix = '/BA/workspace/install/joint_control'

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_control/resource/joint_control

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_control/setup.cfg

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_control/setup.py

View File

@@ -1 +0,0 @@
prepend-non-duplicate;PYTHONPATH;/BA/workspace/build/joint_control

View File

@@ -1,3 +0,0 @@
# generated from colcon_powershell/shell/template/hook_prepend_value.ps1.em
colcon_prepend_unique_value PYTHONPATH "$env:COLCON_CURRENT_PREFIX\/BA/workspace/build/joint_control"

View File

@@ -1,3 +0,0 @@
# generated from colcon_core/shell/template/hook_prepend_value.sh.em
_colcon_prepend_unique_value PYTHONPATH "/BA/workspace/build/joint_control"

View File

@@ -1,101 +0,0 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from osc4py3.as_eventloop import *
from osc4py3 import oscbuildparse
class JointStateOSC(Node):
def __init__(self):
super().__init__('joint_states_osc')
# Create a ROS 2 subscriber to /joint_states topic
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Queue size
)
# Open Sound Control (OSC) Client settings
self.osc_ip = "127.0.0.1" # Replace with the target IP
self.osc_port = 8000 # Replace with the target port
# Start the OSC system
osc_startup()
# Make client channels to send packets
osc_udp_client(self.osc_ip, self.osc_port, "osc_client")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
header = msg.header
joint_names = msg.name
joint_positions = msg.position
joint_velocity = msg.velocity
joint_effort = msg.effort
joint_names_str = "\n- ".join(joint_names)
joint_positions_str = "\n- ".join(map(str, joint_positions))
joint_velocity_str = "\n- ".join(map(str, joint_velocity))
joint_effort_str = "\n- ".join(map(str, joint_effort))
info = f"""
---
header:
stamp:
sec: {header.stamp.sec}
nanosec: {header.stamp.nanosec}
name:
- {joint_names_str}
position:
- {joint_positions_str}
velocity:
- {joint_velocity_str}
effort:
- {joint_effort_str}
---"""
# Send the info message
msg_info = oscbuildparse.OSCMessage("/joint_states", None, [info])
msg_name = oscbuildparse.OSCMessage("/joint_states/name", None, [i for i in joint_names])
msg_position = oscbuildparse.OSCMessage("/joint_states/position", None, [i for i in joint_positions])
msg_velocity = oscbuildparse.OSCMessage("/joint_states/velocity", None, [i for i in joint_velocity])
msg_effort = oscbuildparse.OSCMessage("/joint_states/effort", None, [i for i in joint_effort])
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_info, msg_name, msg_position, msg_velocity, msg_effort])
osc_send(bun, "osc_client")
osc_process()
#print(f"Publishing: {info}")
'''
# Send each joint state as an OSC message
for i, name in enumerate(joint_names):
#msg_sec = oscbuildparse.OSCMessage(f"/joint_states/header/sec", None, [header.stamp.sec])
#msg_nanosec = oscbuildparse.OSCMessage(f"/joint_states/header/nanosec", None, [header.stamp.nanosec])
msg_position = oscbuildparse.OSCMessage(f"/joint_states/{name}/position", None, [joint_positions[i]])
msg_velocity = oscbuildparse.OSCMessage(f"/joint_states/{name}/velocity", None, [joint_velocity[i]])
msg_effort = oscbuildparse.OSCMessage(f"/joint_states/{name}/effort", None, [joint_effort[i]])
bun = oscbuildparse.OSCBundle(oscbuildparse.unixtime2timetag(header.stamp.sec + header.stamp.nanosec), [msg_position, msg_velocity, msg_effort])
#bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY , [msg_position, msg_velocity, msg_effort])
osc_send(bun, "osc_client")
osc_process()
#print(f"OSC bundle sent for joint {name}")
'''
def main():
rclpy.init()
node = JointStateOSC()
print(f"Publishing joint states to OSC on {node.osc_ip}:{node.osc_port}...")
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("shutting down...")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1,41 +0,0 @@
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import time
def joint_states_handler(address, *args):
"""Handler function to process incoming joint states."""
#print([i*180/3.141 for i in args]) # for printing joint angles in degrees
if address == "/joint_states":
print(args[0])
def main():
ip = "0.0.0.0" # IP address to listen on
port = 8000 # Port to listen on
# Start the OSC system
osc_startup()
# Make server channels to receive packets
osc_udp_server(ip, port, "osc_server")
# Associate Python functions with message address patterns
osc_method("/joint_states", joint_states_handler, argscheme=osm.OSCARG_ADDRESS + osm.OSCARG_DATAUNPACK)
print(f"Listening for OSC messages on {ip}:{port}...")
try:
# Run the event loop
while True:
osc_process() # Process OSC messages
time.sleep(0.01) # Sleep to avoid high CPU usage
except KeyboardInterrupt:
print("")
finally:
# Properly close the system
osc_terminate()
if __name__ == "__main__":
main()

View File

@@ -1,75 +0,0 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from osc4py3.as_eventloop import *
from osc4py3 import oscbuildparse
import roboticstoolbox as rtb
import xml.etree.ElementTree as ET
import numpy as np
from scipy.spatial.transform import Rotation as R
class JointStateOSC(Node):
def __init__(self, robot, joint_names):
super().__init__('joint_states_osc')
self.joint_names_urdf = joint_names
self.robot = robot
# Create a ROS 2 subscriber to /joint_states topic
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Queue size
)
# Open Sound Control (OSC) Client settings
self.osc_ip = "127.0.0.1" # Replace with the target IP
self.osc_port = 8000 # Replace with the target port
# Start the OSC system
osc_startup()
# Make client channels to send packets
osc_udp_client(self.osc_ip, self.osc_port, "osc_client")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
header = msg.header
joint_names = msg.name
joint_positions = msg.position
joint_positions = [joint_positions[joint_names.index(joint)] for joint in self.joint_names_urdf]
tcp_pos = self.robot.fkine(joint_positions) #, end='ft_frame')
tcp_xyz = tcp_pos.t
tcp_rot = tcp_pos.R
rotation_vector = R.from_matrix(tcp_rot).as_rotvec()
translation = oscbuildparse.OSCMessage("/tcp_position_t", None, tcp_xyz.tolist())
osc_send(translation, "osc_client")
rotation = oscbuildparse.OSCMessage("/tcp_position_R", None, rotation_vector.tolist())
osc_send(rotation, "osc_client")
osc_process()
#print(f"Published TCP position: {tcp_pos}")
def main():
rclpy.init()
robot = rtb.ERobot.URDF('/BA/robot.urdf')
tree = ET.parse('/BA/robot.urdf')
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
node = JointStateOSC(robot, joint_names)
print(f"Publishing TCP coordinates to OSC on {node.osc_ip}:{node.osc_port}...")
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("shutting down...")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -1 +0,0 @@
SIGINT

View File

@@ -1 +0,0 @@
# generated from colcon_core/shell/template/command_prefix.sh.em

View File

@@ -1,20 +0,0 @@
AMENT_PREFIX_PATH=/BA/workspace/install/painting_robot_control:/BA/workspace/install/mock_robot:/BA/workspace/install/joint_info:/BA/workspace/install/joint_control:/opt/ros/humble
COLCON=1
COLCON_PREFIX_PATH=/BA/workspace/install
HOME=/root
HOSTNAME=0e38e264ac6b
LANG=C.UTF-8
LC_ALL=C.UTF-8
LD_LIBRARY_PATH=/opt/ros/humble/opt/rviz_ogre_vendor/lib:/opt/ros/humble/lib/x86_64-linux-gnu:/opt/ros/humble/lib
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:
OLDPWD=/BA
PATH=/opt/ros/humble/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
PWD=/BA/workspace/build/joint_info
PYTHONPATH=/BA/workspace/build/painting_robot_control:/BA/workspace/install/painting_robot_control/lib/python3.10/site-packages:/BA/workspace/build/mock_robot:/BA/workspace/install/mock_robot/lib/python3.10/site-packages:/BA/workspace/build/joint_info:/BA/workspace/install/joint_info/lib/python3.10/site-packages:/BA/workspace/build/joint_control:/BA/workspace/install/joint_control/lib/python3.10/site-packages:/opt/ros/humble/lib/python3.10/site-packages:/opt/ros/humble/local/lib/python3.10/dist-packages
ROS_DISTRO=humble
ROS_LOCALHOST_ONLY=0
ROS_PYTHON_VERSION=3
ROS_VERSION=2
SHLVL=1
TERM=xterm
_=/usr/bin/colcon

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_info/joint_info

View File

@@ -1,12 +0,0 @@
Metadata-Version: 2.1
Name: joint-info
Version: 0.0.0
Summary: TODO: Package description
Home-page: UNKNOWN
Maintainer: root
Maintainer-email: root@todo.todo
License: TODO: License declaration
Platform: UNKNOWN
UNKNOWN

View File

@@ -1,15 +0,0 @@
package.xml
setup.cfg
setup.py
joint_info/__init__.py
joint_info/osc_joint_states_pub.py
joint_info/osc_joint_states_sub.py
joint_info/tcp_cart_pos.py
joint_info.egg-info/PKG-INFO
joint_info.egg-info/SOURCES.txt
joint_info.egg-info/dependency_links.txt
joint_info.egg-info/entry_points.txt
joint_info.egg-info/requires.txt
joint_info.egg-info/top_level.txt
joint_info.egg-info/zip-safe
resource/joint_info

View File

@@ -1,5 +0,0 @@
[console_scripts]
joint_states_pub = joint_info.osc_joint_states_pub:main
joint_states_sub = joint_info.osc_joint_states_sub:main
tcp_cart_pos = joint_info.tcp_cart_pos:main

View File

@@ -1 +0,0 @@
setuptoolsosc4py3

View File

@@ -1 +0,0 @@
joint_info

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_info/package.xml

View File

@@ -1,4 +0,0 @@
import sys
if sys.prefix == '/usr':
sys.real_prefix = sys.prefix
sys.prefix = sys.exec_prefix = '/BA/workspace/install/joint_info'

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_info/resource/joint_info

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_info/setup.cfg

View File

@@ -1 +0,0 @@
/BA/workspace/src/joint_info/setup.py

View File

@@ -1 +0,0 @@
prepend-non-duplicate;PYTHONPATH;/BA/workspace/build/joint_info

View File

@@ -1,3 +0,0 @@
# generated from colcon_powershell/shell/template/hook_prepend_value.ps1.em
colcon_prepend_unique_value PYTHONPATH "$env:COLCON_CURRENT_PREFIX\/BA/workspace/build/joint_info"

View File

@@ -1,3 +0,0 @@
# generated from colcon_core/shell/template/hook_prepend_value.sh.em
_colcon_prepend_unique_value PYTHONPATH "/BA/workspace/build/joint_info"

View File

@@ -1,63 +0,0 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
import numpy as np
import time
class RobotNode(Node):
def __init__(self):
super().__init__('robot_node')
self.l1 = 0.5
self.l2 = 0.3
self.x = self.l1 + self.l2
self.y = 0.0
self.theta1 = 0.0
self.theta2 = 0.0
self.publisher = self.create_publisher(
JointState,
'/joint_states',
10
)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = JointState()
msg.name = ['joint1', 'joint2']
msg.position = [self.theta1, self.theta2]
self.publisher.publish(msg)
self.get_logger().info(f'x = {self.x}, y = {self.y}, theta1 = {self.theta1}, theta2 = {self.theta2}')
self.subcriber = self.create_subscription(
JointTrajectory,
'/joint_trajectory_controller/joint_trajectory',
self.joint_trajectory_callback,
10
)
def joint_trajectory_callback(self, msg):
joint_names = msg.joint_names
duration = 0
for point in msg.points:
duration = msg.time_from_start_sec + msg.time_from_start_nanosec / 1e9 - duration
for i in range(duration *10):
self.theta1 += (point.positions[0] - self.theta1) / 10
self.theta2 += (point.positions[1] - self.theta2) / 10
self.x = self.l1 * np.cos(self.theta1) + self.l2 * np.cos(self.theta1 + self.theta2)
self.y = self.l1 * np.sin(self.theta1) + self.l2 * np.sin(self.theta1 + self.theta2)
time.sleep(0.1)
def main(args=None):
rclpy.init(args=args)
node = RobotNode()
rclpy.spin(node)
rclpy.shutdown()

View File

@@ -1 +0,0 @@
SIGINT

View File

@@ -1 +0,0 @@
# generated from colcon_core/shell/template/command_prefix.sh.em

View File

@@ -1,20 +0,0 @@
AMENT_PREFIX_PATH=/BA/workspace/install/painting_robot_control:/BA/workspace/install/mock_robot:/BA/workspace/install/joint_info:/BA/workspace/install/joint_control:/opt/ros/humble
COLCON=1
COLCON_PREFIX_PATH=/BA/workspace/install
HOME=/root
HOSTNAME=0e38e264ac6b
LANG=C.UTF-8
LC_ALL=C.UTF-8
LD_LIBRARY_PATH=/opt/ros/humble/opt/rviz_ogre_vendor/lib:/opt/ros/humble/lib/x86_64-linux-gnu:/opt/ros/humble/lib
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:
OLDPWD=/BA
PATH=/opt/ros/humble/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
PWD=/BA/workspace/build/mock_robot
PYTHONPATH=/BA/workspace/build/painting_robot_control:/BA/workspace/install/painting_robot_control/lib/python3.10/site-packages:/BA/workspace/build/mock_robot:/BA/workspace/install/mock_robot/lib/python3.10/site-packages:/BA/workspace/build/joint_info:/BA/workspace/install/joint_info/lib/python3.10/site-packages:/BA/workspace/build/joint_control:/BA/workspace/install/joint_control/lib/python3.10/site-packages:/opt/ros/humble/lib/python3.10/site-packages:/opt/ros/humble/local/lib/python3.10/dist-packages
ROS_DISTRO=humble
ROS_LOCALHOST_ONLY=0
ROS_PYTHON_VERSION=3
ROS_VERSION=2
SHLVL=1
TERM=xterm
_=/usr/bin/colcon

View File

@@ -1 +0,0 @@
/BA/workspace/src/mock_robot/mock_robot

View File

@@ -1,12 +0,0 @@
Metadata-Version: 2.1
Name: mock-robot
Version: 0.0.0
Summary: TODO: Package description
Home-page: UNKNOWN
Maintainer: root
Maintainer-email: root@todo.todo
License: TODO: License declaration
Platform: UNKNOWN
UNKNOWN

View File

@@ -1,20 +0,0 @@
package.xml
setup.cfg
setup.py
../../build/mock_robot/mock_robot.egg-info/PKG-INFO
../../build/mock_robot/mock_robot.egg-info/SOURCES.txt
../../build/mock_robot/mock_robot.egg-info/dependency_links.txt
../../build/mock_robot/mock_robot.egg-info/entry_points.txt
../../build/mock_robot/mock_robot.egg-info/requires.txt
../../build/mock_robot/mock_robot.egg-info/top_level.txt
../../build/mock_robot/mock_robot.egg-info/zip-safe
mock_robot/__init__.py
mock_robot/robot_node.py
mock_robot.egg-info/PKG-INFO
mock_robot.egg-info/SOURCES.txt
mock_robot.egg-info/dependency_links.txt
mock_robot.egg-info/entry_points.txt
mock_robot.egg-info/requires.txt
mock_robot.egg-info/top_level.txt
mock_robot.egg-info/zip-safe
resource/mock_robot

View File

@@ -1,3 +0,0 @@
[console_scripts]
mock_robot = mock_robot.robot_node:main

View File

@@ -1 +0,0 @@
setuptools

View File

@@ -1 +0,0 @@
mock_robot

View File

@@ -1 +0,0 @@
/BA/workspace/src/mock_robot/package.xml

View File

@@ -1,4 +0,0 @@
import sys
if sys.prefix == '/usr':
sys.real_prefix = sys.prefix
sys.prefix = sys.exec_prefix = '/BA/workspace/install/mock_robot'

View File

@@ -1 +0,0 @@
/BA/workspace/src/mock_robot/resource/mock_robot

View File

@@ -1 +0,0 @@
/BA/workspace/src/mock_robot/setup.cfg

View File

@@ -1 +0,0 @@
/BA/workspace/src/mock_robot/setup.py

View File

@@ -1 +0,0 @@
prepend-non-duplicate;PYTHONPATH;/BA/workspace/build/mock_robot

View File

@@ -1,3 +0,0 @@
# generated from colcon_powershell/shell/template/hook_prepend_value.ps1.em
colcon_prepend_unique_value PYTHONPATH "$env:COLCON_CURRENT_PREFIX\/BA/workspace/build/mock_robot"

View File

@@ -1,3 +0,0 @@
# generated from colcon_core/shell/template/hook_prepend_value.sh.em
_colcon_prepend_unique_value PYTHONPATH "/BA/workspace/build/mock_robot"

View File

@@ -1 +0,0 @@
0

View File

@@ -1 +0,0 @@
# generated from colcon_core/shell/template/command_prefix.sh.em

View File

@@ -1,20 +0,0 @@
AMENT_PREFIX_PATH=/BA/workspace/install/painting_robot_control:/BA/workspace/install/osc_ros2:/BA/workspace/install/mock_robot:/BA/workspace/install/joint_info:/BA/workspace/install/joint_control:/opt/ros/humble
COLCON=1
COLCON_PREFIX_PATH=/BA/workspace/install
HOME=/root
HOSTNAME=0e38e264ac6b
LANG=C.UTF-8
LC_ALL=C.UTF-8
LD_LIBRARY_PATH=/opt/ros/humble/opt/rviz_ogre_vendor/lib:/opt/ros/humble/lib/x86_64-linux-gnu:/opt/ros/humble/lib
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.webp=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:
OLDPWD=/BA
PATH=/opt/ros/humble/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
PWD=/BA/workspace/build/osc_ros2
PYTHONPATH=/BA/workspace/build/painting_robot_control:/BA/workspace/install/painting_robot_control/lib/python3.10/site-packages:/BA/workspace/build/osc_ros2:/BA/workspace/install/osc_ros2/lib/python3.10/site-packages:/BA/workspace/build/mock_robot:/BA/workspace/install/mock_robot/lib/python3.10/site-packages:/BA/workspace/build/joint_info:/BA/workspace/install/joint_info/lib/python3.10/site-packages:/BA/workspace/build/joint_control:/BA/workspace/install/joint_control/lib/python3.10/site-packages:/opt/ros/humble/lib/python3.10/site-packages:/opt/ros/humble/local/lib/python3.10/dist-packages
ROS_DISTRO=humble
ROS_LOCALHOST_ONLY=0
ROS_PYTHON_VERSION=3
ROS_VERSION=2
SHLVL=1
TERM=xterm
_=/usr/bin/colcon

View File

@@ -1 +0,0 @@
/BA/workspace/src/osc_ros2/osc_ros2

View File

@@ -1,12 +0,0 @@
Metadata-Version: 2.1
Name: osc-ros2
Version: 0.0.0
Summary: Creates an interface for communication between OSC and Ros2
Home-page: UNKNOWN
Maintainer: Alexander Schaefer
Maintainer-email: a.schaefer@tuhh.de
License: Apache-2.0
Platform: UNKNOWN
UNKNOWN

View File

@@ -1,14 +0,0 @@
package.xml
setup.cfg
setup.py
osc_ros2/__init__.py
osc_ros2/osc_ros2.py
osc_ros2/osc_ros2_rpy.py
osc_ros2.egg-info/PKG-INFO
osc_ros2.egg-info/SOURCES.txt
osc_ros2.egg-info/dependency_links.txt
osc_ros2.egg-info/entry_points.txt
osc_ros2.egg-info/requires.txt
osc_ros2.egg-info/top_level.txt
osc_ros2.egg-info/zip-safe
resource/osc_ros2

View File

@@ -1,4 +0,0 @@
[console_scripts]
interface = osc_ros2.osc_ros2:main
interface1 = osc_ros2.osc_ros2_rpy:main

View File

@@ -1,7 +0,0 @@
matplotlib==3.6.3
numpy==1.23.5
osc4py3
roboticstoolbox-python==1.0.1
scipy==1.10.1
setuptools
spatialmath-python==1.0.0

View File

@@ -1 +0,0 @@
osc_ros2

View File

@@ -1 +0,0 @@
/BA/workspace/src/osc_ros2/package.xml

View File

@@ -1,4 +0,0 @@
import sys
if sys.prefix == '/usr':
sys.real_prefix = sys.prefix
sys.prefix = sys.exec_prefix = '/BA/workspace/install/osc_ros2'

View File

@@ -1 +0,0 @@
/BA/workspace/src/osc_ros2/resource/osc_ros2

Some files were not shown because too many files have changed in this diff Show More