[AN] (feat) Interface with config fully works

This commit is contained in:
Ali
2025-06-08 22:13:18 +02:00
parent 6db7143bd5
commit 4ab2ff3c3b
7 changed files with 275 additions and 73 deletions

19
examples/log.py Normal file
View File

@@ -0,0 +1,19 @@
import socket
from ssl import SOCK_STREAM
from oscpy import *
UDP_IP = "172.18.0.1"
UDP_PORT = 5005
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(10*1024)
if "#bundle" in data[:8].decode():
b = Bundle(data)
print(b)
else:
m = Message(data)
print(m)

45
examples/move.py Normal file
View File

@@ -0,0 +1,45 @@
import socket
import time
from oscpy import *
SEND_IP = "172.18.0.2"
SEND_PORT = 8000
RECV_IP = "172.18.0.1"
RECV_PORT = 7000
pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
msg = Message("/joint_positions", [-1]*6).encode()
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
if n > 0:
print(f"INFO: Sent {n} bytes")
time.sleep(10)
msg = Message("/tcp_coordinates", [-1]*6).encode()
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
if n > 0:
print(f"INFO: Sent {n} bytes")
time.sleep(10)
sub.bind((RECV_IP, RECV_PORT))
joint_names = []
found = False
while not found:
data, addr = sub.recvfrom(5*1024)
if "#bundle" in data[:8].decode():
b = Bundle().decode(data)
for msg in b.msgs:
if "/joint_state/name" in msg.address:
joint_names = msg.args
found = True
if joint_names:
msg = Message(f"/joint_position/{joint_names[2]}", [ 0 ])
n = pub.sendto(msg.encode(), (SEND_IP, SEND_PORT))
if n > 0:
print(f"INFO: Sent {n} bytes")

138
examples/oscpy/__init__.py Normal file
View File

@@ -0,0 +1,138 @@
import struct
from typing import final, override
OSC_TYPES = str | int | float | bool | bytes
def get_string_size(data: bytes) -> int:
n = data.find(b'\x00')
n = n + 4 - n%4
return n
def to_osc_string(s: str) -> bytes:
n = len(s)
b = struct.pack(f'{n}s{4 - n%4}x', s.encode())
return b
def to_osc_blob(b: bytes) -> bytes:
n = len(b)
b = struct.pack(f'i{4 - n%4}p', n, b)
return b
def parse_string(data: bytes) -> tuple[bytes, str]:
n = get_string_size(data)
values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n])
value = values[0].split(b"\x00", 1)
return data[n:], value[0].decode()
def parse_float(data: bytes) -> tuple[bytes, float]:
values: tuple[float] = struct.unpack('>f', data[:4])
return data[4:], values[0]
def parse_int(data: bytes) -> tuple[bytes, int]:
values: tuple[int] = struct.unpack('>i', data[:4])
return data[4:], values[0]
def parse_blob(data: bytes) -> tuple[bytes, bytes]:
n_values: tuple[int] = struct.unpack('>i', data[:4])
n: int = n_values[0]
values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n])
return data[4+n:], values[0]
def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]:
tt = tt[1:]
args: list[OSC_TYPES] = []
for c in tt:
match c:
case 's':
data, val = parse_string(data)
args.append(val)
case 'b':
data, val = parse_blob(data)
args.append(val)
case 'f':
data, val = parse_float(data)
args.append(val)
case 'i':
data, val = parse_int(data)
args.append(val)
case _:
print(f"[ERROR]: Got {c}")
return args
def encode_args(args: list[OSC_TYPES]) -> bytes:
encoded = b''
for arg in args:
match arg:
case str(): encoded += to_osc_string(arg)
case float(): encoded += struct.pack('>f', arg)
case int(): encoded += struct.pack('>i', arg)
case bytes(): encoded += to_osc_blob(arg)
return encoded
def parse_type(arg: OSC_TYPES) -> str:
match arg:
case str(): return "s"
case float(): return "f"
case int(): return "i"
case bytes(): return "b"
@final
class Message:
def __init__(self, address: str = "/", args: list[OSC_TYPES] = []):
self.address: str = address
self.type_tags = ","
for arg in args:
self.type_tags = self.type_tags + parse_type(arg)
self.args: list[OSC_TYPES] = args
def decode(self, data: bytes):
data, self.address = parse_string(data)
data, self.type_tags = parse_string(data)
self.args = parse_args(self.type_tags, data)
return self
def encode(self) -> bytes:
msg = to_osc_string(self.address)
msg += to_osc_string(self.type_tags)
msg += encode_args(self.args)
return msg
@override
def __str__(self) -> str:
return f"{self.address} [{self.type_tags}]: {self.args}"
@final
class Bundle:
def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []):
self.header = "#bundle"
self.ttag = time_tag
self.msgs = msgs
def decode(self, data: bytes):
data, self.header = parse_string(data)
data[4:]
data, self.ttag = parse_float(data)
data = data[4:]
self.msgs: list[Message] = []
while len(data) > 0:
data, n = parse_int(data)
msg_data = data[:n]
data = data[n:]
self.msgs.append(Message().decode(msg_data))
return self
def encode(self) -> bytes:
bundle = to_osc_string("#bundle")
bundle += struct.pack('4xf4x', self.ttag)
for msg in self.msgs:
msg_data = msg.encode()
bundle += struct.pack('i', len(msg_data))
bundle += msg_data
return bundle
@override
def __str__(self) -> str:
out = f"{self.header} ({self.ttag}):\n"
for msg in self.msgs:
out += f" - {msg}\n"
return out

27
examples/state.py Normal file
View File

@@ -0,0 +1,27 @@
import socket
import struct
from oscpy import *
UDP_IP = "172.18.0.1"
UDP_PORT = 7000
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(10*1024)
n = get_string_size(data)
[tag] = struct.unpack(f'{n}s', data[:n])
if "#bundle" in tag.decode():
b = Bundle(data)
print("Bundle:")
for msg in b.msgs:
print(" -", msg)
else:
msg = Message(data)
if msg.address == "/time":
msg.args[0] = float(msg.args[0])
print(msg)

7
examples/test.py Normal file
View File

@@ -0,0 +1,7 @@
from oscpy import *
encoded = Message("/", ["test", 0.1, 9]).encode()
new_msg = Message().decode(encoded).encode()
assert(encoded == new_msg)