AS: single joint control
This commit is contained in:
Binary file not shown.
@@ -283,7 +283,7 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
# Register OSC handler
|
||||
osc_method("/joint_positions", self.joint_positions_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_ADDRESS+osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/tcp_coordinates", self.tcp_coordinates_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
@@ -306,7 +306,7 @@ class OSC_ROS2_interface(Node):
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
|
||||
self.get_logger().info(f'Ready to receive OSC messages on 0.0.0.0:{commands_port}')
|
||||
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
||||
self.get_logger().info(f'Sending joint states to {state_ip}:{state_port}')
|
||||
self.get_logger().info(f'Sending log messages to {log_ip}:{log_port}')
|
||||
self.create_timer(1/self.hz, self.update_position) # Timer to update the position
|
||||
@@ -322,6 +322,8 @@ class OSC_ROS2_interface(Node):
|
||||
self.speed_scaling = -float(args[0])
|
||||
else:
|
||||
self.speed_scaling = float(args[0])
|
||||
if self.speed_scaling > 1:
|
||||
self.get_logger().warn(f"speed_scaling_handler: Attention! Speed scaling {self.speed_scaling} is greater than 1!")
|
||||
self.get_logger().info(f"Speed scaling set to {self.speed_scaling}")
|
||||
else:
|
||||
self.get_logger().warn(f"Invalid number of arguments for speed scaling. Expected 1, but got {len(args)}.")
|
||||
@@ -331,8 +333,53 @@ class OSC_ROS2_interface(Node):
|
||||
def joint_trajectory_handler(self, *args):
|
||||
pass
|
||||
|
||||
def joint_position_handler(self, *args):
|
||||
pass
|
||||
def joint_position_handler(self, address, *args):
|
||||
"""Handles incoming OSC messages for joint positions."""
|
||||
try:
|
||||
joint_name = address.split("/")[-1]
|
||||
if joint_name in self.joint_names:
|
||||
if len(args) == 1:
|
||||
position = float(args[0])
|
||||
index = self.joint_names.index(joint_name)
|
||||
if position < self.robot.qlim[0][index]:
|
||||
position = self.robot.qlim[0][index]
|
||||
self.get_logger().warn(
|
||||
f"joint_position_handler: Joint '{joint_name}' position {position} is out of bounds. Using {self.robot.qlim[0][index]}."
|
||||
)
|
||||
elif position > self.robot.qlim[1][index]:
|
||||
position = self.robot.qlim[1][index]
|
||||
self.get_logger().warn(
|
||||
f"joint_position_handler: Joint '{joint_name}' position {position} is out of bounds. Using {self.robot.qlim[1][index]}."
|
||||
)
|
||||
desired_joint_positions = self.current_joint_positions
|
||||
desired_joint_positions[index] = position
|
||||
self.desired = ["joint_positions"] + desired_joint_positions
|
||||
self.new = True
|
||||
elif len(args) == 2:
|
||||
position = float(args[0])
|
||||
duration = float(args[1])
|
||||
index = self.joint_names.index(joint_name)
|
||||
if position < self.robot.qlim[0][index]:
|
||||
position = self.robot.qlim[0][index]
|
||||
self.get_logger().warn(
|
||||
f"joint_position_handler: Joint '{joint_name}' position {position} is out of bounds. Using {self.robot.qlim[0][index]}."
|
||||
)
|
||||
elif position > self.robot.qlim[1][index]:
|
||||
position = self.robot.qlim[1][index]
|
||||
self.get_logger().warn(
|
||||
f"joint_position_handler: Joint '{joint_name}' position {position} is out of bounds. Using {self.robot.qlim[1][index]}."
|
||||
)
|
||||
desired_joint_positions = self.current_joint_positions
|
||||
desired_joint_positions[index] = position
|
||||
self.desired = ["joint_positions"] + desired_joint_positions + [duration]
|
||||
self.new = True
|
||||
else:
|
||||
self.get_logger().warn(f"joint_position_handler: Invalid number of arguments for joint position. Expected 1, but got {len(args)}.")
|
||||
else:
|
||||
self.get_logger().warn(f"joint_position_handler: Joint '{joint_name}' not found in the robot model.")
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"joint_position_handler: {e}")
|
||||
|
||||
|
||||
def cartesian_trajectory_handler(self, *args):
|
||||
"""Handles incoming OSC messages for cartesian trajectory."""
|
||||
@@ -346,7 +393,7 @@ class OSC_ROS2_interface(Node):
|
||||
"""Handles incoming OSC messages for joint positions."""
|
||||
try:
|
||||
if len(args) == len(self.joint_names):
|
||||
desired_joint_positions = [float(i) for i in list(args)] + [None]
|
||||
desired_joint_positions = [float(i) for i in list(args)]
|
||||
elif len(args) == len(self.joint_names) + 1:
|
||||
desired_joint_positions = [float(i) for i in list(args)]
|
||||
else:
|
||||
@@ -363,7 +410,7 @@ class OSC_ROS2_interface(Node):
|
||||
elif position > self.robot.qlim[1][i]:
|
||||
desired_joint_positions[i] = self.robot.qlim[1][i]
|
||||
self.get_logger().warn(
|
||||
f"joint_positions_handler:Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.robot.qlim[1][i]}."
|
||||
f"joint_positions_handler: Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.robot.qlim[1][i]}."
|
||||
)
|
||||
|
||||
self.desired = ["joint_positions"] + desired_joint_positions
|
||||
@@ -420,23 +467,24 @@ class OSC_ROS2_interface(Node):
|
||||
osc_send(msg_time, "osc_client")
|
||||
if not(self.joint_names): self.joint_names = msg.name
|
||||
joint_position_dict = dict(zip(msg.name, msg.position))
|
||||
self.current_joint_positions = [joint_position_dict[name] for name in self.joint_names]
|
||||
self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||
joint_position_dict = dict(zip(msg.name, msg.velocity))
|
||||
self.current_joint_velocities = [joint_position_dict[name] for name in self.joint_names]
|
||||
self.current_joint_velocities = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||
|
||||
if self.robot:
|
||||
tcp_position = self.robot.fkine(self.current_joint_positions).t
|
||||
tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy()
|
||||
|
||||
msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
|
||||
msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
|
||||
msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
|
||||
msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
|
||||
msg_roll = oscbuildparse.OSCMessage(f"/tcp_coordinates/roll", ',f', [tcp_orientation[0]])
|
||||
msg_pitch = oscbuildparse.OSCMessage(f"/tcp_coordinates/pitch", ',f', [tcp_orientation[1]])
|
||||
msg_yaw = oscbuildparse.OSCMessage(f"/tcp_coordinates/yaw", ',f', [tcp_orientation[2]])
|
||||
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw])
|
||||
osc_send(bun, "osc_client")
|
||||
#msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
|
||||
#msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
|
||||
#msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
|
||||
#msg_roll = oscbuildparse.OSCMessage(f"/tcp_coordinates/roll", ',f', [tcp_orientation[0]])
|
||||
#msg_pitch = oscbuildparse.OSCMessage(f"/tcp_coordinates/pitch", ',f', [tcp_orientation[1]])
|
||||
#msg_yaw = oscbuildparse.OSCMessage(f"/tcp_coordinates/yaw", ',f', [tcp_orientation[2]])
|
||||
#bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw])
|
||||
#osc_send(bun, "osc_client")
|
||||
osc_send(msg_tcp, "osc_client")
|
||||
|
||||
msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
|
||||
msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
|
||||
@@ -445,19 +493,55 @@ class OSC_ROS2_interface(Node):
|
||||
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort])
|
||||
osc_send(bun, "osc_client")
|
||||
|
||||
for i, name in enumerate(msg.name):
|
||||
msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]])
|
||||
msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]])
|
||||
msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]])
|
||||
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort])
|
||||
osc_send(bun, "osc_client")
|
||||
#for i, name in enumerate(msg.name):
|
||||
# msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]])
|
||||
# msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]])
|
||||
# msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]])
|
||||
# bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort])
|
||||
# osc_send(bun, "osc_client")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"joint_states_callback: {e}")
|
||||
|
||||
def send_joint_positions(self):
|
||||
pass
|
||||
self.previous_desired = None
|
||||
self.new = False
|
||||
try:
|
||||
if len(self.desired) == len(self.joint_names) + 2:
|
||||
desired_joint_positions = [float(i) for i in self.desired[1:-1]]
|
||||
duration = self.desired[-1]
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = desired_joint_positions
|
||||
point.time_from_start.sec = int(duration)
|
||||
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
|
||||
msg.points.append(point)
|
||||
self.publisher.publish(msg)
|
||||
elif len(self.desired) == len(self.joint_names) + 1:
|
||||
desired_joint_positions = [float(i) for i in self.desired[1:]]
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = desired_joint_positions
|
||||
duration = 0
|
||||
for p1, p2, max_vel in zip(desired_joint_positions, self.current_joint_positions, self.joint_velocity_limits.values()):
|
||||
duration = max(duration, abs(p1 - p2) / max_vel)
|
||||
duration = duration + min(2, 0.2*duration)
|
||||
duration /= self.speed_scaling
|
||||
if duration == 0:
|
||||
self.get_logger().warn("send_joint_positions: Duration is 0.")
|
||||
return
|
||||
point.time_from_start.sec = int(duration)
|
||||
point.time_from_start.nanosec = int((duration - int(duration)) * 1e9)
|
||||
msg.points.append(point)
|
||||
self.publisher.publish(msg)
|
||||
else:
|
||||
self.get_logger().warn(f"send_joint_positions: Invalid number of arguments for joint positions. Expected {len(self.joint_names)+1} ([q0, q1, q2, ... q{len(self.joint_names)}, duration]) or {len(self.joint_names)} ([q0, q1, q2, ... q{len(self.joint_names)}]), but got {len(self.desired)}.")
|
||||
return
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"send_joint_positions: {e}")
|
||||
return
|
||||
|
||||
def trapezoidal_timestamps(self, num_points,total_duration, flat_ratio = 0.3):
|
||||
|
||||
@@ -478,6 +562,7 @@ class OSC_ROS2_interface(Node):
|
||||
def send_tcp_coordinates(self):
|
||||
"""Send the desired TCP coordinates to the robot."""
|
||||
try:
|
||||
self.new = False
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
steps_per_m = 100
|
||||
@@ -538,7 +623,6 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
if self.desired[-1]:
|
||||
timestamps = self.trapezoidal_timestamps(steps, self.desired[-1], 0.8)
|
||||
print(f'timestamps: {timestamps}')
|
||||
for j in range(steps):
|
||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||
if sol[1] == 1:
|
||||
@@ -619,6 +703,7 @@ class OSC_ROS2_interface(Node):
|
||||
continue
|
||||
point = JointTrajectoryPoint()
|
||||
point.positions = list(sol[0])
|
||||
duration *= 1.2
|
||||
duration /= self.speed_scaling
|
||||
duration += prev_duration
|
||||
prev_duration = duration
|
||||
|
||||
Reference in New Issue
Block a user