OSC_ROS2/osc4py3_test/sub.py
Alexander Schaefer 98759f2bab AS: osc4py3
2025-03-14 09:37:10 +01:00

38 lines
1.2 KiB
Python

from osc4py3.as_eventloop import *
from osc4py3 import oscbuildparse
# Define an OSC bundle handler
def bundle_handler(bundle):
"""Handles received OSC bundles, extracts the timetag and messages."""
print(f"📩 Received an OSC Bundle!")
# Extract timetag from the bundle metadata
timetag = bundle.timetag
unix_time = oscbuildparse.timetag2unixtime(timetag)
print(f"⏳ Timetag (NTP format): {timetag}")
print(f"⏳ Converted Unix Time: {unix_time}")
# Process all elements inside the bundle
for element in bundle.elements:
if isinstance(element, oscbuildparse.OSCMessage):
print(f"📨 Message received at {element.address} with args {element.params}")
# Start OSC
osc_startup()
# Create an OSC UDP server to receive bundles
osc_udp_server("127.0.0.1", 8000, "osc_server")
# Register the handler for OSC Bundles (Without `oscdispatch`)
osc_method('/*', bundle_handler, argscheme="B") # 'B' for bundles
print("🚀 Listening for OSC bundles on 127.0.0.1:8000...\nPress Ctrl+C to exit.")
# Keep processing incoming OSC messages
try:
while True:
osc_process()
except KeyboardInterrupt:
print("\nShutting down OSC receiver...")
osc_terminate()