OSC_ROS2/test/commands_recording.py
Alexander Schaefer b67b2a5174 AS: code update
2025-05-07 23:12:12 +02:00

27 lines
827 B
Python

from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
import csv
import time
# CSV file setup
csv_file = open("osc_log.csv", mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["timestamp", "x", "y", "z", "roll", "pitch", "yaw"])
# Handler for OSC messages
def handle_pose(address, *args):
timestamp = time.time()
csv_writer.writerow([timestamp] + list(args))
csv_file.flush() # Optional: ensures data is written immediately
# Setup dispatcher and bind address
dispatcher = Dispatcher()
dispatcher.map("/tcp_coordinates", handle_pose) # Accept messages sent to address /pose
# Start server
ip = "0.0.0.0"
port = 8000
server = osc_server.ThreadingOSCUDPServer((ip, port), dispatcher)
print(f"Listening for OSC messages on {ip}:{port}...")
server.serve_forever()