From dbaab8d88637b77aaad4530fea0f86a461998473 Mon Sep 17 00:00:00 2001 From: Alexander Schaefer Date: Fri, 14 Mar 2025 09:40:15 +0100 Subject: [PATCH] AS: osc4py3 --- osc4py3_test/pub.py | 26 -------------------------- osc4py3_test/sub.py | 37 ------------------------------------- osc4py3_test/test.py | 31 ------------------------------- 3 files changed, 94 deletions(-) delete mode 100644 osc4py3_test/pub.py delete mode 100644 osc4py3_test/sub.py delete mode 100644 osc4py3_test/test.py diff --git a/osc4py3_test/pub.py b/osc4py3_test/pub.py deleted file mode 100644 index 143a766..0000000 --- a/osc4py3_test/pub.py +++ /dev/null @@ -1,26 +0,0 @@ -from osc4py3.as_eventloop import * -from osc4py3 import oscbuildparse -import time - -# Start OSC -osc_startup() -osc_udp_client("127.0.0.1", 8000, "osc_client") - -# Generate a future timetag (current time + 2 seconds) -timetag = oscbuildparse.unixtime2timetag(time.time() + 2) - -# Create messages with proper addresses -msg1 = oscbuildparse.OSCMessage("/joint_states/joint1/position", None, [42]) -msg2 = oscbuildparse.OSCMessage("/joint_states/joint2/velocity", None, [3.14]) - -# Create an OSC bundle with the correct timetag -bundle = oscbuildparse.OSCBundle(timetag, [msg1, msg2]) - -# Send the bundle -osc_send(bundle, "osc_client") -osc_process() - -print(f"📤 Sent OSC bundle with timetag: {timetag}") - -# Terminate OSC -osc_terminate() diff --git a/osc4py3_test/sub.py b/osc4py3_test/sub.py deleted file mode 100644 index e96693e..0000000 --- a/osc4py3_test/sub.py +++ /dev/null @@ -1,37 +0,0 @@ -from osc4py3.as_eventloop import * -from osc4py3 import oscbuildparse - -# Define an OSC bundle handler -def bundle_handler(bundle): - """Handles received OSC bundles, extracts the timetag and messages.""" - print(f"📩 Received an OSC Bundle!") - - # Extract timetag from the bundle metadata - timetag = bundle.timetag - unix_time = oscbuildparse.timetag2unixtime(timetag) - print(f"⏳ Timetag (NTP format): {timetag}") - print(f"⏳ Converted Unix Time: {unix_time}") - - # Process all elements inside the bundle - for element in bundle.elements: - if isinstance(element, oscbuildparse.OSCMessage): - print(f"📨 Message received at {element.address} with args {element.params}") - -# Start OSC -osc_startup() - -# Create an OSC UDP server to receive bundles -osc_udp_server("127.0.0.1", 8000, "osc_server") - -# Register the handler for OSC Bundles (Without `oscdispatch`) -osc_method('/*', bundle_handler, argscheme="B") # 'B' for bundles - -print("🚀 Listening for OSC bundles on 127.0.0.1:8000...\nPress Ctrl+C to exit.") - -# Keep processing incoming OSC messages -try: - while True: - osc_process() -except KeyboardInterrupt: - print("\nShutting down OSC receiver...") - osc_terminate() diff --git a/osc4py3_test/test.py b/osc4py3_test/test.py deleted file mode 100644 index c56aef7..0000000 --- a/osc4py3_test/test.py +++ /dev/null @@ -1,31 +0,0 @@ -from osc4py3.as_eventloop import * -#from osc4py3 import oscdispatch -import time -from osc4py3 import oscmethod as osm - -# Define the OSC message handler -def joint_states_handler(address): - time.sleep(1) # Simulate processing time - print(f"Received OSC message at {address}") - -# Start the OSC system -osc_startup() - -# Create an OSC UDP server -osc_udp_server("127.0.0.1", 8000, "osc_server") - -# Register handlers for different joint state messages -osc_method("/joint_states", joint_states_handler, argscheme=osm.OSCARG_ADDRESS) -osc_method("/joint_states/*/position", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) -osc_method("/joint_states/*/velocity", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) -osc_method("/joint_states/*/effort", joint_states_handler,argscheme=osm.OSCARG_ADDRESS) - -print("OSC Receiver is listening on 127.0.0.1:8000...\nPress Ctrl+C to exit.") - -# Keep the server running -try: - while True: - osc_process() -except KeyboardInterrupt: - print("\nShutting down OSC receiver...") - osc_terminate()