100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
import rclpy
|
|
from rclpy.node import Node
|
|
from std_msgs.msg import Int32
|
|
from math import sin, cos, pi
|
|
from trajectory_msgs.msg import JointTrajectory
|
|
from sensor_msgs.msg import JointState
|
|
import time
|
|
|
|
# Adjusted joint limits based on nearest multiple of step angle
|
|
|
|
|
|
|
|
class ArmControllerNode(Node):
|
|
|
|
def __init__(self):
|
|
super().__init__('arm_controller_node')
|
|
self.subscription = self.create_subscription(
|
|
JointTrajectory,
|
|
'/scaled_joint_trajectory_controller/joint_trajectory',
|
|
self.joint_trajectory_callback,
|
|
100
|
|
)
|
|
self.dq1_publisher = self.create_publisher(Int32, 'dq1_steps', 10)
|
|
self.dq2_publisher = self.create_publisher(Int32, 'dq2_steps', 10)
|
|
self.joint_state_publisher = self.create_publisher(JointState, 'joint_states', 10)
|
|
|
|
self.q1_min = -1.57
|
|
self.q1_max = 1
|
|
self.q2_min = 0.02
|
|
self.q2_max = 2.8
|
|
|
|
self.current_q1 = 0.00
|
|
self.current_q2 = 1.57 # Initial joint angles
|
|
steps_per_revolution = 200
|
|
gear_ratio = 19.2
|
|
self.steps_per_radian = (steps_per_revolution * gear_ratio) / (2 * pi)
|
|
|
|
self.ceate_timer(0.2, self.publish_joint_state)
|
|
|
|
def publish_joint_state(self):
|
|
joint_state_msg = JointState()
|
|
joint_state_msg.header.stamp = self.get_clock().now().to_msg()
|
|
joint_state_msg.name = ['joint_1', 'joint_2']
|
|
joint_state_msg.position = [self.current_q1, self.current_q2]
|
|
joint_state_msg.velocity = [0.0, 0.0]
|
|
joint_state_msg.effort = [0.0, 0.0]
|
|
self.joint_state_publisher.publish(joint_state_msg)
|
|
|
|
def joint_trajectory_callback(self, msg):
|
|
prev_timetag = 0
|
|
for point in msg.points:
|
|
timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9 - prev_timetag
|
|
prev_timetag = point.time_from_start.sec + point.time_from_start.nanosec / 1e9
|
|
|
|
new_q1=max(min(point.positions[0],self.q1_max),self.q1_min)
|
|
new_q2=max(min(point.positions[1],self.q2_max),self.q2_min)
|
|
dq1 = new_q1 - self.current_q1
|
|
dq2 = new_q2 - self.current_q2
|
|
dq1_steps = int(round(dq1 * self.steps_per_radian))
|
|
dq2_steps = int(round(dq2 * self.steps_per_radian))
|
|
|
|
self.current_q1 += dq1_steps/self.steps_per_radian
|
|
self.current_q2 += dq2_steps/self.steps_per_radian
|
|
|
|
dq1_steps_msg = Int32()
|
|
dq1_steps_msg.data = dq1_steps
|
|
self.dq1_publisher.publish(dq1_steps_msg)
|
|
|
|
dq2_steps_msg = Int32()
|
|
dq2_steps_msg.data = dq2_steps
|
|
self.dq2_publisher.publish(dq2_steps_msg)
|
|
x= 0.4 * cos(self.current_q1) + 0.25025 * cos(self.current_q1+self.current_q2)
|
|
y= 0.4 * sin(self.current_q1) + 0.25025 * sin(self.current_q1+self.current_q2)
|
|
|
|
self.get_logger().info(f"Steps taken: steps_q1 = {dq1_steps}, steps_q2 = {dq2_steps}")
|
|
self.get_logger().info(f"New joint positions (in radians): q1 = {self.current_q1}, q2 = {self.current_q2}")
|
|
self.get_logger().info(f"New x_y positions (in meters): x = {x}, y = {y}")
|
|
self.get_logger().info(f"Duration: {timetag}")
|
|
time.sleep(timetag)
|
|
|
|
|
|
|
|
|
|
def main(args=None):
|
|
try:
|
|
rclpy.init(args=args)
|
|
node = ArmControllerNode()
|
|
rclpy.spin(node)
|
|
except KeyboardInterrupt:
|
|
print('Communication to painting robot closed')
|
|
finally:
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|
|
|