AS: osc4py3
This commit is contained in:
parent
98759f2bab
commit
dbaab8d886
@ -1,26 +0,0 @@
|
||||
from osc4py3.as_eventloop import *
|
||||
from osc4py3 import oscbuildparse
|
||||
import time
|
||||
|
||||
# Start OSC
|
||||
osc_startup()
|
||||
osc_udp_client("127.0.0.1", 8000, "osc_client")
|
||||
|
||||
# Generate a future timetag (current time + 2 seconds)
|
||||
timetag = oscbuildparse.unixtime2timetag(time.time() + 2)
|
||||
|
||||
# Create messages with proper addresses
|
||||
msg1 = oscbuildparse.OSCMessage("/joint_states/joint1/position", None, [42])
|
||||
msg2 = oscbuildparse.OSCMessage("/joint_states/joint2/velocity", None, [3.14])
|
||||
|
||||
# Create an OSC bundle with the correct timetag
|
||||
bundle = oscbuildparse.OSCBundle(timetag, [msg1, msg2])
|
||||
|
||||
# Send the bundle
|
||||
osc_send(bundle, "osc_client")
|
||||
osc_process()
|
||||
|
||||
print(f"📤 Sent OSC bundle with timetag: {timetag}")
|
||||
|
||||
# Terminate OSC
|
||||
osc_terminate()
|
@ -1,37 +0,0 @@
|
||||
from osc4py3.as_eventloop import *
|
||||
from osc4py3 import oscbuildparse
|
||||
|
||||
# Define an OSC bundle handler
|
||||
def bundle_handler(bundle):
|
||||
"""Handles received OSC bundles, extracts the timetag and messages."""
|
||||
print(f"📩 Received an OSC Bundle!")
|
||||
|
||||
# Extract timetag from the bundle metadata
|
||||
timetag = bundle.timetag
|
||||
unix_time = oscbuildparse.timetag2unixtime(timetag)
|
||||
print(f"⏳ Timetag (NTP format): {timetag}")
|
||||
print(f"⏳ Converted Unix Time: {unix_time}")
|
||||
|
||||
# Process all elements inside the bundle
|
||||
for element in bundle.elements:
|
||||
if isinstance(element, oscbuildparse.OSCMessage):
|
||||
print(f"📨 Message received at {element.address} with args {element.params}")
|
||||
|
||||
# Start OSC
|
||||
osc_startup()
|
||||
|
||||
# Create an OSC UDP server to receive bundles
|
||||
osc_udp_server("127.0.0.1", 8000, "osc_server")
|
||||
|
||||
# Register the handler for OSC Bundles (Without `oscdispatch`)
|
||||
osc_method('/*', bundle_handler, argscheme="B") # 'B' for bundles
|
||||
|
||||
print("🚀 Listening for OSC bundles on 127.0.0.1:8000...\nPress Ctrl+C to exit.")
|
||||
|
||||
# Keep processing incoming OSC messages
|
||||
try:
|
||||
while True:
|
||||
osc_process()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down OSC receiver...")
|
||||
osc_terminate()
|
@ -1,31 +0,0 @@
|
||||
from osc4py3.as_eventloop import *
|
||||
#from osc4py3 import oscdispatch
|
||||
import time
|
||||
from osc4py3 import oscmethod as osm
|
||||
|
||||
# Define the OSC message handler
|
||||
def joint_states_handler(address):
|
||||
time.sleep(1) # Simulate processing time
|
||||
print(f"Received OSC message at {address}")
|
||||
|
||||
# Start the OSC system
|
||||
osc_startup()
|
||||
|
||||
# Create an OSC UDP server
|
||||
osc_udp_server("127.0.0.1", 8000, "osc_server")
|
||||
|
||||
# Register handlers for different joint state messages
|
||||
osc_method("/joint_states", joint_states_handler, argscheme=osm.OSCARG_ADDRESS)
|
||||
osc_method("/joint_states/*/position", joint_states_handler,argscheme=osm.OSCARG_ADDRESS)
|
||||
osc_method("/joint_states/*/velocity", joint_states_handler,argscheme=osm.OSCARG_ADDRESS)
|
||||
osc_method("/joint_states/*/effort", joint_states_handler,argscheme=osm.OSCARG_ADDRESS)
|
||||
|
||||
print("OSC Receiver is listening on 127.0.0.1:8000...\nPress Ctrl+C to exit.")
|
||||
|
||||
# Keep the server running
|
||||
try:
|
||||
while True:
|
||||
osc_process()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down OSC receiver...")
|
||||
osc_terminate()
|
Loading…
Reference in New Issue
Block a user