remove gitignore

This commit is contained in:
Alexander Schaefer
2025-04-22 17:10:24 +02:00
parent 97cd67a008
commit 33f21d3096
519 changed files with 18046 additions and 0 deletions

BIN
workspace/src/.DS_Store vendored Normal file

Binary file not shown.

18
workspace/src/ik_test.py Normal file
View File

@@ -0,0 +1,18 @@
import roboticstoolbox as rtb
import spatialmath as sm
robot = rtb.ERobot.URDF('/BA/workspace/src/painting_robot_control/painting_robot_control/painting_robot.urdf')
robot1 = rtb.ERobot.URDF('/BA/ur10e.urdf')
"""print(robot)
print(T)
sol = robot.ik_LM(T)
print(sol[1])"""
print(robot1.n)
'''T = sm.SE3(0.2,0.2,0.0)*sm.SE3.Rz(0.5)
#T = robot.fkine([-2,-0.5])
print(T)
sol = robot.ik_LM(T, mask = [1,0,0,0,0,1], joint_limits = False)
print(sol[1])
print(sol[0])
print(robot.fkine(sol[0]))'''

View File

@@ -0,0 +1,128 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print(f"Received joint angles: {args}")
self.desired_joint_positions = [float(i) for i in list(args)]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_names = msg.name # List of joint names
joint_position_dict = dict(zip(joint_names, self.current_joint_positions))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
time1 = time.time()
if self.desired_joint_positions == self.previous_desired:
return
duration = 0
for p1, p2, max_vel in zip(self.desired_joint_positions, self.current_joint_positions, self.joint_velocity_limits.values()):
duration = max(max(duration, abs(p1 - p2) / max_vel),1/self.hz)# as minimun
duration *= 1.8
#print(f"Duration: {duration}")
#print(f'vel: {max(np.array(self.desired_joint_positions_joint_positions) - np.array(self.joint_positions)/duration)}')
msg = JointTrajectory()
msg.joint_names = self.joint_names
point = JointTrajectoryPoint()
point.positions = self.desired_joint_positions # Updated joint positions
point.time_from_start.sec = int(duration)
#point.time_from_start.sec = 10
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
#point.time_from_start.nanosec = 0
#point.velocities = [i*0.95 for i in self.joint_velocity_limits.values()]
msg.points.append(point)
self.publisher.publish(msg)
#print(f"desired: {self.desired_joint_positions}")
#print(f"desired: {[180/3.141*i for i in self.desired_joint_positions]}")
#print(f"current: {[180/3.141*i for i in self.current_joint_positions]}")
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,158 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
self.desired_joint_positions = [float(i) for i in list(args)]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 30
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
T1 = self.robot.fkine(self.current_joint_positions)
[x,y,z] = T1.t
[roll, pitch, yaw] = T1.rpy()
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
#duration *= 2
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else:
print('IK could not find a solution!')
prev_sol = self.current_joint_positions
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,162 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
self.prev_pose = None
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
self.desired_joint_positions = [float(i) for i in list(args)]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 100
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
if self.prev_pose == None:
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
else:
[x,y,z] = self.prev_pose[:3]
[roll, pitch, yaw] = self.prev_pose[3:]
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
self.prev_pose = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True) if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
prev_sol = list(sol[0])
if duration == 0:
continue
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 2
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
else:
print('IK could not find a solution!')
prev_sol = self.current_joint_positions
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
robot_urdf = input("Enter the path to the URDF file: ")
tree = ET.parse(robot_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(robot_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,232 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import time
import numpy as np
import spatialmath as sm
import roboticstoolbox as rtb
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
super().__init__('scaled_joint_trajectory_publisher')
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
1
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.current_joint_positions = [0.0] * len(joint_names)
self.joint_names = joint_names
self.joint_velocity_limits = joint_velocity_limits
self.desired_joint_positions = [0.0] * len(joint_names)
self.previous_desired = [0.0] * len(joint_names)
self.robot = robot
self.cost_mask = cost_mask
ip = "0.0.0.0" # Listen on all network interfaces
port = 8000 # Must match the sender's port in `joint_state_osc.py`
osc_startup()
osc_udp_server(ip, port, "osc_server")
print("Server started on 0.0.0.0:8000 \n ready to receive messages in the following format: /joint_trajectroy [joint_positions] optional: duration as last argument")
# Register OSC handler
osc_method("/joint_angles", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
self.hz = float(input("Enter the desired refresh frequency (Hz): "))
# Start the OSC server in a separate thread to avoid blocking the ROS 2 event loop
self.create_timer(1/self.hz, self.update_position) # Timer to call osc_process periodically
while True:
try:
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
continue
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
print("Invalid input. Lower limit must be less than upper limit for each axis.")
continue
print(f"Current limits:")
print(f"x: {self.x_limits}")
print(f"y: {self.y_limits}")
print(f"z: {self.z_limits}")
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the limits.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
except ValueError:
print("Invalid input. Please enter numeric values only.")
def joint_angles_handler(self, *args):
# Ensure the desired joint positions are within the specified limits
x, y, z, r, p, yaw, *_ = [float(i) for i in list(args)]
if self.x_limits[0] is not None:
x = max(self.x_limits[0], x)
if self.x_limits[1] is not None:
x = min(self.x_limits[1], x)
if self.y_limits[0] is not None:
y = max(self.y_limits[0], y)
if self.y_limits[1] is not None:
y = min(self.y_limits[1], y)
if self.z_limits[0] is not None:
z = max(self.z_limits[0], z)
if self.z_limits[1] is not None:
z = min(self.z_limits[1], z)
if x != args[0] or y != args[1] or z != args[2]:
self.get_logger().warn(
f"Desired joint positions adjusted to fit within limits: "
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
)
self.desired_joint_positions = [x, y, z, r, p, yaw]
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
def rampfunction(self, startvalue, blendtime, currenttime):
"""
Ramp function to create a smooth transition from startvalue to 1 over blendtime seconds.
"""
if currenttime < blendtime:
return startvalue + (1 - startvalue) * (currenttime / blendtime)
else:
return 1
def update_position(self):
if self.desired_joint_positions == self.previous_desired:
return
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 30
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
T1 = self.robot.fkine(self.current_joint_positions)
[x,y,z] = T1.t
[roll, pitch, yaw] = T1.rpy()
x1, y1, z1, roll1, pitch1, yaw1 = self.desired_joint_positions
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
for j in range(steps):
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
duration = 0
prev = self.current_joint_positions if j == 0 else prev_sol
for p1, p2, max_vel in zip(sol[0], prev, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel)#, 1/self.hz) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration /= self.rampfunction(0.1, 2, prev_duration)
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else:
print(f'IK could not find a solution for (x,y,z) = ({cart_traj[j].t}), (roll,pitch,yaw) = ({cart_traj[j].rpy()})!')
prev_sol = self.current_joint_positions
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
self.previous_desired = self.desired_joint_positions
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotation.")
while True:
try:
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
considered_coords = [coord for coord, use in zip(['x', 'y', 'z', 'roll', 'pitch', 'yaw'], cost_mask) if use == 1]
print(f"The following coordinates will be considered for IK: {', '.join(considered_coords)}")
confirm = input("Are you sure you want to proceed with this cost mask? (y/n): ").strip().lower()
if confirm == 'y':
break
elif confirm == 'n':
print("Please re-enter the cost mask.")
else:
print("Invalid input. Please enter 'y' or 'n'.")
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
rclpy.init()
node = ScaledJointTrajectoryPublisher(joint_names, joint_velocity_limits, robot, cost_mask)
# Run ROS 2 spin, and osc_process will be handled by the timer
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,36 @@
import numpy as np
from roboticstoolbox.tools.trajectory import mstraj
# Define via points (each row is a joint configuration)
viapoints = np.array([
[0, 0, 0], # Start
[0.5, 0.2, -0.1], # Intermediate
[1.0, 0.4, 0.2] # End
])
# Time step
dt = 0.01 # seconds
# Acceleration time
tacc = 0.2 # seconds
# Maximum joint velocity per joint (same length as number of joints)
qdmax = [0.5, 0.3, 0.4] # radians per second
# Optional: starting position (otherwise uses first viapoint)
q0 = viapoints[0]
# Generate the trajectory
traj = mstraj(
viapoints=viapoints,
dt=dt,
tacc=tacc,
qdmax=qdmax,
)
# Extract trajectory
time = traj.t # Time vector
positions = traj.q # Joint angles (shape: K x N)
print("Time vector:", time)
print("Joint positions:\n", positions)

View File

@@ -0,0 +1,141 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = [0.0,0.0,0.0,0.0,0.0,0.0] if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for p1, p2, max_vel in zip(sol[0], prev_sol, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration *= 1.6
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,166 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
self.maximum_acceleration = [0.0] * len(joint_names)
# Store received joint positions
self.joint_names = joint_names
for joint in joint_names:
self.maximum_acceleration[joint_names.index(joint)] = float(input(f"Enter the maximum acceleration for joint {joint}: "))
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = self.current_joint_positions if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for i, (p1, p2, max_vel) in enumerate(zip(sol[0], prev_sol, self.joint_velocity_limits.values())):
print(f'joint {i}, p1: {p1}, p2: {p2}, max_vel: {max_vel}')
if len(msg.points) == 0: v = self.current_joint_velocities[i]
max_acc_duration = np.sqrt((v/self.maximum_acceleration[i])**2 + 2*(abs(p1 - p2)/self.maximum_acceleration[i]))- v/self.maximum_acceleration[i]
duration = max(duration, abs(p1 - p2) / max_vel, max_acc_duration) # as minimun
v = abs(p1 - p2) / duration
print(f'duration: {duration}, max_acc_duration: {max_acc_duration}, max_vel_duration: { abs(p1 - p2) / max_vel}, v: {v}')
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,152 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from osc4py3.as_eventloop import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def rampfunction(self, startvalue, blendtime, currenttime):
"""
Ramp function to create a smooth transition from startvalue to 1 over blendtime seconds.
"""
if currenttime < blendtime:
return startvalue + (1 - startvalue) * (currenttime / blendtime)
else:
return 1
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps = 50
if True: #len(args[0]) == len(self.joint_names):
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = [0.0,0.0,0.0,0.0,0.0,0.0] if i == 0 else sol[0]
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
if list(sol[0])==list(prev_sol): continue
duration = 0
for p1, p2, max_vel in zip(sol[0], prev_sol, self.joint_velocity_limits.values()):
duration = max(duration, abs(p1 - p2) / max_vel) # as minimun
point = JointTrajectoryPoint()
point.positions = list(sol[0])
duration /= self.rampfunction(0.1, 2, prev_duration)
duration += prev_duration
prev_duration = duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,166 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = [joint_velocity_limits[joint] for joint in joint_names]
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
viapoints = []
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 1
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
steps = int(np.linalg.norm(np.array([x1-x, y1-y, z1-z])) * steps_per_m)
if steps < 2: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
if i == 0: prev_sol = self.current_joint_positions
for j in (range(steps) if i == 0 else range(1,steps)):
#print(cart_traj[j])
sol = self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
viapoints.append(list(sol[0]))
prev_sol = list(sol[0])
else: print('IK could not find a solution!')
dt = 0.01
tacc = 0.1
print(f'length viapoints: {len(viapoints)}')
traj = rtb.mstraj(np.array(viapoints), q0 = self.current_joint_positions ,dt=dt, tacc=tacc, qdmax=[1 * i for i in self.joint_velocity_limits])
print(len(traj.q))
print(len(traj.t))
print(traj.t)
print(traj.arrive)
msg.points = []
for i in range(len(traj.q)):
point = JointTrajectoryPoint()
point.positions = list(traj.q[i])
point.time_from_start.sec = int(traj.t[i])
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
#point.time_from_start = rclpy.duration.Duration(seconds=traj.t[i]).to_msg()
msg.points.append(point)
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
print('published')
except Exception as e:
print(f'Error in joint angles handler: {e}')
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,169 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = joint_velocity_limits
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
# Store received joint positions
self.joint_names = joint_names
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
viapoints = np.array([list(i) for i in args])
print(1)
msg = JointTrajectory()
print(2)
msg.joint_names = self.joint_names
print(3)
x,y,z = self.robot.fkine(self.current_joint_positions).t
r,p,y = self.robot.fkine(self.current_joint_positions).rpy()
q0 = [x, y, z, r, p, y]
print(4)
traj = rtb.mstraj(viapoints, q0 = q0 ,dt=0.01, tacc=1, qdmax=[0.1]*len(self.joint_names))
print(traj.q)
print(5)
msg.points = []
print(6)
prev_sol = q0
for i in range(len(traj.q)):
point = JointTrajectoryPoint()
print(8)
T = sm.SE3(traj.q[i][:3]) * sm.SE3.RPY(traj.q[i][3:])
sol = self.robot.ik_LM(T, q0=prev_sol, mask = self.cost_mask, joint_limits = True)
print(9)
if sol[1] == 1:
print(10)
point.positions = list(sol[0])
print(11)
point.time_from_start.sec = int(traj.t[i])
print(12)
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
print(13)
msg.points.append(point)
print(14)
prev_sol = list(sol[0])
print(15)
else: print('IK could not find a solution!')
print(16)
msg.header.stamp = self.get_clock().now().to_msg()
print(17)
self.publisher.publish(msg)
print('published')
except Exception as e:
print(f'Error in joint_angles_handler: {e}')
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,185 @@
import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from osc4py3.as_allthreads import *
from osc4py3 import oscmethod as osm
import xml.etree.ElementTree as ET
import roboticstoolbox as rtb
import spatialmath as sm
import numpy as np
import time
import os
class ScaledJointTrajectoryPublisher(Node):
"""Node to publish joint trajectories based on OSC messages."""
def __init__(self, joint_names, robot, cost_mask, joint_velocity_limits):
super().__init__('scaled_joint_trajectory_publisher')
self.joint_velocity_limits = [joint_velocity_limits[joint] for joint in joint_names]
self.cost_mask = cost_mask
self.robot = robot
self.trajectroy_topic_name = input("Enter the topic name to which the joint trajectory should be sent to: ")
if self.trajectroy_topic_name == "":
self.trajectroy_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
# ROS2 Publisher
self.publisher = self.create_publisher(
JointTrajectory,
self.trajectroy_topic_name,
10
)
self.subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_states_callback,
1 # Increased queue size for joint states
)
self.maximum_acceleration = [0.0] * len(joint_names)
# Store received joint positions
self.joint_names = joint_names
for joint in joint_names:
self.maximum_acceleration[joint_names.index(joint)] = float(input(f"Enter the maximum acceleration for joint {joint}: "))
self.port = 8000 # UDP port
osc_startup()
osc_udp_server("0.0.0.0", self.port, "osc_server")
print(f"Server started on 0.0.0.0:{str(self.port)} \n ready to receive messages in the following format: /joint_trajectroy [tcp_coordinates0, tcp_coordinates1, ...] optional: timestamp as last element of each tcp_coordinates")
# Register OSC handler
osc_method("/joint_trajectory", self.joint_angles_handler, argscheme=osm.OSCARG_DATAUNPACK)
print("OSC method registered for /joint_trajectory")
def joint_states_callback(self, msg):
"""Callback function to handle incoming joint states."""
joint_position_dict = dict(zip(msg.name, msg.position))
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
joint_velocity_dict = dict(zip(msg.name, msg.velocity))
self.current_joint_velocities = [joint_velocity_dict[name] for name in self.joint_names]
def joint_angles_handler(self, *args):
"""Handles incoming OSC messages for joint positions."""
try:
print("Received joint positions")
msg = JointTrajectory()
msg.joint_names = self.joint_names
steps_per_m = 50
prev_duration = 0
for i in range(len(args)-1):
x, y, z, roll, pitch, yaw = args[i]
x1, y1, z1, roll1, pitch1, yaw1 = args[i+1]
steps = int(np.linalg.norm([x1-x, y1-y, z1-z])*steps_per_m)
if steps <= 1: steps = 2
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i], order='xyz') for i in range(steps)]
prev_sol = self.current_joint_positions if i == 0 else sol[0]
sol_set = []
for j in (range(steps) if i == 0 else range(1,steps)):
sol = self.robot.ik_LM(cart_traj[j], q0=[0.0] * len(self.joint_names), mask = self.cost_mask, joint_limits = True) if i == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True)
if sol[1] == 1:
sol_set.append(sol[0])
prev_sol = list(sol[0])
else: print(f'IK could not find a solution for (x,y,z) = ({cart_traj[j].t}), (roll,pitch,yaw) = ({cart_traj[j].rpy()})!')
distance = abs(sol_set[0]-sol_set[-1])
ts= distance/np.array(self.joint_velocity_limits)+2*np.array(self.joint_velocity_limits)/np.array(self.maximum_acceleration)
t = max(ts)
idx = list(ts).index(t)
s_acc = self.joint_velocity_limits[idx]**2/(2*self.maximum_acceleration[idx])
print(f"t: {t}, idx: {idx}, s_acc: {s_acc}")
print(f"sol_set: {sol_set}")
for sol in sol_set:
print(f"sol: {sol}")
s = abs(sol[idx]-sol_set[0][idx])
print(f"sol_set[0][idx]: {sol_set[0][idx]}, sol[idx]: {sol[idx]}, s: {s}")
if s <= s_acc:
duration = np.sqrt(s/self.maximum_acceleration[idx])
print(f"acceleration phase, duration: {duration}")
elif s <= sol_set[-1][idx]-s_acc:
duration = self.joint_velocity_limits[idx]/self.maximum_acceleration[idx] + (s-s_acc)/self.joint_velocity_limits[idx]
print(f"constant velocity phase, duration: {duration}")
else:
duration = t-np.sqrt((sol_set[-1][idx]-s)/self.maximum_acceleration[idx])
print(f"deceleration phase, duration: {duration}")
point = JointTrajectoryPoint()
point.positions = list(sol)
duration += prev_duration
point.time_from_start.sec = int(duration)
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
msg.points.append(point)
prev_duration = duration
msg.header.stamp = self.get_clock().now().to_msg()
self.publisher.publish(msg)
except Exception as e:
print(f"Error in joint angles handler: {e}")
def main():
"""Main function to get joint names and start the ROS 2 & OSC system."""
while True:
path_to_urdf = input("Enter the path to the URDF file: ")
if os.path.isfile(path_to_urdf):
if not path_to_urdf.endswith('.urdf'):
print("The file is not a URDF file. Please enter a valid URDF file.")
continue
break
else:
print("Invalid path. Please enter a valid path to the URDF file.")
tree = ET.parse(path_to_urdf)
root = tree.getroot()
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
robot = rtb.ERobot.URDF(path_to_urdf)
print(robot)
joint_velocity_limits = {}
# Iterate over all joints in the URDF
for joint in root.findall('.//joint'):
joint_name = joint.get('name') # Get the name of the joint
# Look for the <limit> tag under each joint
limit = joint.find('limit')
if limit is not None:
# Extract the velocity limit (if it exists)
velocity_limit = limit.get('velocity')
if velocity_limit is not None:
joint_velocity_limits[joint_name] = float(velocity_limit)
rclpy.init()
while True:
try:
print("The cost mask determines which coordinates are used for the IK. Each element of the cost mask corresponds to a catesian coordinate [x, y, z, Rx, Ry, Rz].")
print("The cost mask [1, 1, 1, 0, 0, 0] means that the IK will only consider translation and no rotaion.")
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
break
else:
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
except ValueError:
print("Invalid input. Please enter integers only.")
print(f"Cost mask: {cost_mask}")
node = ScaledJointTrajectoryPublisher(joint_names, robot, cost_mask, joint_velocity_limits)
# Run both ROS 2 and OSC Server together
try:
while rclpy.ok():
osc_process() # Handle one OSC request at a time
rclpy.spin_once(node, timeout_sec=0.1) # Process ROS callbacks
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
osc_terminate()
if __name__ == '__main__':
main()

BIN
workspace/src/mock_robot/.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,38 @@
<?xml version="1.0"?>
<robot name="mock_robot">
<!-- Base Link -->
<link name="base_link"/>
<!-- Link 1 -->
<link name="link1"/>
<joint name="joint1" type="revolute">
<parent link="base_link"/>
<child link="link1"/>
<origin xyz="0 0 0" rpy="0 0 0"/>
<axis xyz="0 0 1"/>
<limit effort="10.0" velocity="1.0" lower="-3.14" upper="3.14"/>
</joint>
<!-- Link 2 -->
<link name="link2"/>
<joint name="joint2" type="revolute">
<parent link="link1"/>
<child link="link2"/>
<origin xyz="0.5 0 0" rpy="0 0 0"/> <!-- 0.5m offset -->
<axis xyz="0 0 1"/>
<limit effort="10.0" velocity="1.0" lower="-3.14" upper="3.14"/>
</joint>
<!-- Tool endpoint -->
<link name="tool0"/>
<joint name="tool0_fixed_joint" type="fixed">
<parent link="link2"/>
<child link="tool0"/>
<origin xyz="0.3 0 0" rpy="0 0 0"/> <!-- TCP 0.3m from joint2 -->
</joint>
</robot>

View File

@@ -0,0 +1,73 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from trajectory_msgs.msg import JointTrajectory
import numpy as np
import time
class RobotNode(Node):
def __init__(self):
super().__init__('robot_node')
self.l1 = 0.5
self.l2 = 0.3
self.x = self.l1 + self.l2
self.y = 0.0
self.theta1 = 0.0
self.theta2 = 0.0
self.publisher = self.create_publisher(
JointState,
'/joint_states',
1
)
timer_period = 0.01 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.subcriber = self.create_subscription(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
self.joint_trajectory_callback,
1
)
def timer_callback(self):
msg = JointState()
msg.name = ['joint1', 'joint2']
msg.header.stamp = self.get_clock().now().to_msg()
msg.position = [self.theta1, self.theta2]
msg.velocity = [self.x, self.y]
self.publisher.publish(msg)
def joint_trajectory_callback(self, msg):
joint_names = msg.joint_names
prev_timetag = 0
for point in msg.points:
duration = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 - prev_timetag
prev_timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9
steps = int(duration * 10) # Update in 10 steps per second
for i in range(steps):
self.theta1 += (point.positions[0] - self.theta1) / steps
self.theta2 += (point.positions[1] - self.theta2) / steps
self.x = self.l1 * np.cos(self.theta1) + self.l2 * np.cos(self.theta1 + self.theta2)
self.y = self.l1 * np.sin(self.theta1) + self.l2 * np.sin(self.theta1 + self.theta2)
msg = JointState()
msg.name = ['joint1', 'joint2']
msg.header.stamp = self.get_clock().now().to_msg()
msg.position = [self.theta1, self.theta2]
msg.velocity = [self.x, self.y]
self.publisher.publish(msg)
time.sleep(0.1)
def main(args=None):
rclpy.init(args=args)
node = RobotNode()
rclpy.spin(node)
rclpy.shutdown()

View File

@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>mock_robot</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/mock_robot
[install]
install_scripts=$base/lib/mock_robot

View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'mock_robot'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='root',
maintainer_email='root@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'mock_robot = mock_robot.robot_node:main',
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

Binary file not shown.

View File

@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>painting_robot_control</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="root@todo.todo">root</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@@ -0,0 +1,86 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from math import sin, cos, pi
from trajectory_msgs.msg import JointTrajectory
import time
# Adjusted joint limits based on nearest multiple of step angle
class ArmControllerNode(Node):
def __init__(self):
super().__init__('arm_controller_node')
self.subscription = self.create_subscription(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
self.joint_trajectory_callback,
100
)
self.dq1_publisher = self.create_publisher(Int32, 'dq1_steps', 10)
self.dq2_publisher = self.create_publisher(Int32, 'dq2_steps', 10)
self.q1_min = -1.57
self.q1_max = 1
self.q2_min = 0.02
self.q2_max = 2.8
self.current_q1 = 0.00
self.current_q2 = 1.57 # Initial joint angles
steps_per_revolution = 200
gear_ratio = 19.2
self.steps_per_radian = (steps_per_revolution * gear_ratio) / (2 * pi)
def joint_trajectory_callback(self, msg):
prev_timetag = 0
for point in msg.points:
timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 - prev_timetag
prev_timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9
new_q1=max(min(point.positions[0],self.q1_max),self.q1_min)
new_q2=max(min(point.positions[1],self.q2_max),self.q2_min)
dq1 = new_q1 - self.current_q1
dq2 = new_q2 - self.current_q2
dq1_steps = int(round(dq1 * self.steps_per_radian))
dq2_steps = int(round(dq2 * self.steps_per_radian))
self.current_q1 += dq1_steps/self.steps_per_radian
self.current_q2 += dq2_steps/self.steps_per_radian
dq1_steps_msg = Int32()
dq1_steps_msg.data = dq1_steps
self.dq1_publisher.publish(dq1_steps_msg)
dq2_steps_msg = Int32()
dq2_steps_msg.data = dq2_steps
self.dq2_publisher.publish(dq2_steps_msg)
x= 0.4 * cos(self.current_q1) + 0.25025 * cos(self.current_q1+self.current_q2)
y= 0.4 * sin(self.current_q1) + 0.25025 * sin(self.current_q1+self.current_q2)
self.get_logger().info(f"Steps taken: steps_q1 = {dq1_steps}, steps_q2 = {dq2_steps}")
self.get_logger().info(f"New joint positions (in radians): q1 = {self.current_q1}, q2 = {self.current_q2}")
self.get_logger().info(f"New x_y positions (in meters): x = {x}, y = {y}")
self.get_logger().info(f"Duration: {timetag}")
time.sleep(timetag)
def main(args=None):
try:
rclpy.init(args=args)
node = ArmControllerNode()
rclpy.spin(node)
except KeyboardInterrupt:
print('Communication to painting robot closed')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/painting_robot_control
[install]
install_scripts=$base/lib/painting_robot_control

View File

@@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'painting_robot_control'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='root',
maintainer_email='root@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'painting_robot_controller = painting_robot_control.com_node:main'
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'