OSC_ROS2/test/joint_trajectory_recording.py
2025-04-30 12:28:56 +02:00

80 lines
2.7 KiB
Python

import rclpy
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory
import csv
import os
import roboticstoolbox as rtb
class JointTrajectoryLogger(Node):
def __init__(self):
super().__init__('joint_trajectory_logger_minimal')
# Folder to store CSV files
self.log_dir = './joint_trajectories'
os.makedirs(self.log_dir, exist_ok=True)
self.log_dir_cart = './joint_trajectories_cart'
os.makedirs(self.log_dir_cart, exist_ok=True)
# Subscriber to JointTrajectory messages
self.subscription = self.create_subscription(
JointTrajectory,
'/scaled_joint_trajectory_controller/joint_trajectory',
self.trajectory_callback,
10
)
self.counter = 0
self.robot = rtb.ERobot.URDF('/ws/src/ba-alexanderschaefer/ur10e.urdf')
def trajectory_callback(self, msg: JointTrajectory):
self.counter += 1
# Create a unique filename
filename = f"trajectory_{self.counter:04d}.csv"
file_path = os.path.join(self.log_dir, filename)
# Header: timestamp + joint names
header = ['timestamp'] + msg.joint_names
recive_time = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
with open(file_path, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(header)
for point in msg.points:
timestamp = recive_time + point.time_from_start.sec + point.time_from_start.nanosec * 1e-9
row = [timestamp] + list(point.positions)
writer.writerow(row)
filename_cart = f"trajectory_{self.counter:04d}_cart.csv"
file_path_cart = os.path.join(self.log_dir_cart, filename_cart)
header = ['timestamp', 'x', 'y', 'z', 'roll', 'pitch', 'yaw']
# Convert joint angles to cartesian coordinates
joint_angles = point.positions
# Write to CSV
with open(file_path_cart, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(header)
for point in msg.points:
timestamp = recive_time + point.time_from_start.sec + point.time_from_start.nanosec * 1e-9
T = self.robot.fkine(point.positions)
[x,y,z] = T.t
roll, pitch, yaw = T.rpy()
row = [timestamp, x, y, z, roll, pitch, yaw]
writer.writerow(row)
def main():
rclpy.init()
node = JointTrajectoryLogger()
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()