OSC_ROS2/com_node.py
2025-05-23 14:05:33 +02:00

101 lines
3.6 KiB
Python

import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from math import sin, cos, pi
from trajectory_msgs.msg import JointTrajectory
from sensor_msgs.msg import JointState
import time
# Adjusted joint limits based on nearest multiple of step angle
class ArmControllerNode(Node):
def __init__(self):
super().__init__('arm_controller_node')
self.subscription = self.create_subscription(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
self.joint_trajectory_callback,
100
)
self.dq1_publisher = self.create_publisher(Int32, 'dq1_steps', 10)
self.dq2_publisher = self.create_publisher(Int32, 'dq2_steps', 10)
self.joint_state_publisher = self.create_publisher(JointState, 'joint_states', 10)
self.q1_min = -1.57
self.q1_max = 1
self.q2_min = 0.02
self.q2_max = 2.8
self.current_q1 = 0.00
self.current_q2 = 1.57 # Initial joint angles
steps_per_revolution = 200
gear_ratio = 19.2
self.steps_per_radian = (steps_per_revolution * gear_ratio) / (2 * pi)
def publish_joint_state(self):
joint_state_msg = JointState()
joint_state_msg.header.stamp = self.get_clock().now().to_msg()
joint_state_msg.name = ['joint_1', 'joint_2']
joint_state_msg.position = [self.current_q1, self.current_q2]
joint_state_msg.velocity = [0.0, 0.0]
joint_state_msg.effort = [0.0, 0.0]
self.joint_state_publisher.publish(joint_state_msg)
self.ceate_timer(0.2, self.publish_joint_state)
def joint_trajectory_callback(self, msg):
prev_timetag = 0
for point in msg.points:
timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 - prev_timetag
prev_timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9
new_q1=max(min(point.positions[0],self.q1_max),self.q1_min)
new_q2=max(min(point.positions[1],self.q2_max),self.q2_min)
dq1 = new_q1 - self.current_q1
dq2 = new_q2 - self.current_q2
dq1_steps = int(round(dq1 * self.steps_per_radian))
dq2_steps = int(round(dq2 * self.steps_per_radian))
self.current_q1 += dq1_steps/self.steps_per_radian
self.current_q2 += dq2_steps/self.steps_per_radian
dq1_steps_msg = Int32()
dq1_steps_msg.data = dq1_steps
self.dq1_publisher.publish(dq1_steps_msg)
dq2_steps_msg = Int32()
dq2_steps_msg.data = dq2_steps
self.dq2_publisher.publish(dq2_steps_msg)
x= 0.4 * cos(self.current_q1) + 0.25025 * cos(self.current_q1+self.current_q2)
y= 0.4 * sin(self.current_q1) + 0.25025 * sin(self.current_q1+self.current_q2)
self.get_logger().info(f"Steps taken: steps_q1 = {dq1_steps}, steps_q2 = {dq2_steps}")
self.get_logger().info(f"New joint positions (in radians): q1 = {self.current_q1}, q2 = {self.current_q2}")
self.get_logger().info(f"New x_y positions (in meters): x = {x}, y = {y}")
self.get_logger().info(f"Duration: {timetag}")
time.sleep(timetag)
def main(args=None):
try:
rclpy.init(args=args)
node = ArmControllerNode()
rclpy.spin(node)
except KeyboardInterrupt:
print('Communication to painting robot closed')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()