OSC_ROS2/test/osc_ws/address_scan_thread.py
2025-05-04 23:38:12 +02:00

53 lines
1.8 KiB
Python

import threading
import time
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
import sys
# Define the IP and Port
IP_ADDRESS = "0.0.0.0" # Listen on all interfaces
PORT = 8000 # Choose an available UDP port
TIMEOUT = 0.7 # Auto-terminate after x seconds of inactivity
# Set to track seen addresses
seen = set()
last_received_time = time.time() # Tracks the last time a new address was received
disp = Dispatcher()
def default_handler(address: str, *args) -> None:
"""Handles incoming OSC messages and tracks new addresses."""
global last_received_time
address = address.split('/')[1]
if address not in seen:
seen.add(address)
print(f"/{address}")
last_received_time = time.time() # Update last received time ONLY when a new address is found
def detailed_handler(address: str, *args) -> None:
"""Handles incoming OSC messages and tracks new addresses."""
global last_received_time
if address not in seen:
seen.add(address)
print(f"{address}")
last_received_time = time.time() # Update last received time ONLY when a new address is found
if len(sys.argv)>1: disp.set_default_handler(detailed_handler)
else: disp.set_default_handler(default_handler)
server = BlockingOSCUDPServer((IP_ADDRESS, PORT), disp)
def stop_server_on_timeout():
"""Stops the server if no new addresses are discovered within TIMEOUT seconds."""
global last_received_time
while True:
if time.time() - last_received_time > TIMEOUT:
server.shutdown() # Properly shutdown the server
return # Exit the thread
# Run timeout check in a separate thread
timeout_thread = threading.Thread(target=stop_server_on_timeout, daemon=True)
timeout_thread.start()
# Start the server
server.serve_forever() # Blocking call, runs until shutdown