import rclpy from rclpy.node import Node from std_msgs.msg import Int32 from math import sin, cos, pi from trajectory_msgs.msg import JointTrajectory from sensor_msgs.msg import JointState import time # Adjusted joint limits based on nearest multiple of step angle class ArmControllerNode(Node): def __init__(self): super().__init__('arm_controller_node') self.subscription = self.create_subscription( JointTrajectory, '/scaled_joint_trajectory_controller/joint_trajectory', self.joint_trajectory_callback, 100 ) self.dq1_publisher = self.create_publisher(Int32, 'dq1_steps', 10) self.dq2_publisher = self.create_publisher(Int32, 'dq2_steps', 10) self.joint_state_publisher = self.create_publisher(JointState, 'joint_states', 10) self.q1_min = -1.57 self.q1_max = 1 self.q2_min = 0.02 self.q2_max = 2.8 self.current_q1 = 0.00 self.current_q2 = 1.57 # Initial joint angles steps_per_revolution = 200 gear_ratio = 19.2 self.steps_per_radian = (steps_per_revolution * gear_ratio) / (2 * pi) def publish_joint_state(self): joint_state_msg = JointState() joint_state_msg.header.stamp = self.get_clock().now().to_msg() joint_state_msg.name = ['joint_1', 'joint_2'] joint_state_msg.position = [self.current_q1, self.current_q2] joint_state_msg.velocity = [0.0, 0.0] joint_state_msg.effort = [0.0, 0.0] self.joint_state_publisher.publish(joint_state_msg) self.ceate_timer(0.2, self.publish_joint_state) def joint_trajectory_callback(self, msg): prev_timetag = 0 for point in msg.points: timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 - prev_timetag prev_timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 new_q1=max(min(point.positions[0],self.q1_max),self.q1_min) new_q2=max(min(point.positions[1],self.q2_max),self.q2_min) dq1 = new_q1 - self.current_q1 dq2 = new_q2 - self.current_q2 dq1_steps = int(round(dq1 * self.steps_per_radian)) dq2_steps = int(round(dq2 * self.steps_per_radian)) self.current_q1 += dq1_steps/self.steps_per_radian self.current_q2 += dq2_steps/self.steps_per_radian dq1_steps_msg = Int32() dq1_steps_msg.data = dq1_steps self.dq1_publisher.publish(dq1_steps_msg) dq2_steps_msg = Int32() dq2_steps_msg.data = dq2_steps self.dq2_publisher.publish(dq2_steps_msg) x= 0.4 * cos(self.current_q1) + 0.25025 * cos(self.current_q1+self.current_q2) y= 0.4 * sin(self.current_q1) + 0.25025 * sin(self.current_q1+self.current_q2) self.get_logger().info(f"Steps taken: steps_q1 = {dq1_steps}, steps_q2 = {dq2_steps}") self.get_logger().info(f"New joint positions (in radians): q1 = {self.current_q1}, q2 = {self.current_q2}") self.get_logger().info(f"New x_y positions (in meters): x = {x}, y = {y}") self.get_logger().info(f"Duration: {timetag}") time.sleep(timetag) def main(args=None): try: rclpy.init(args=args) node = ArmControllerNode() rclpy.spin(node) except KeyboardInterrupt: print('Communication to painting robot closed') finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()