OSC_ROS2/examples/oscpy/__init__.py

139 lines
4.1 KiB
Python

import struct
from typing import final, override
OSC_TYPES = str | int | float | bool | bytes
def get_string_size(data: bytes) -> int:
n = data.find(b'\x00')
n = n + 4 - n%4
return n
def to_osc_string(s: str) -> bytes:
n = len(s)
b = struct.pack(f'{n}s{4 - n%4}x', s.encode())
return b
def to_osc_blob(b: bytes) -> bytes:
n = len(b)
b = struct.pack(f'i{4 - n%4}p', n, b)
return b
def parse_string(data: bytes) -> tuple[bytes, str]:
n = get_string_size(data)
values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n])
value = values[0].split(b"\x00", 1)
return data[n:], value[0].decode()
def parse_float(data: bytes) -> tuple[bytes, float]:
values: tuple[float] = struct.unpack('>f', data[:4])
return data[4:], values[0]
def parse_int(data: bytes) -> tuple[bytes, int]:
values: tuple[int] = struct.unpack('>i', data[:4])
return data[4:], values[0]
def parse_blob(data: bytes) -> tuple[bytes, bytes]:
n_values: tuple[int] = struct.unpack('>i', data[:4])
n: int = n_values[0]
values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n])
return data[4+n:], values[0]
def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]:
tt = tt[1:]
args: list[OSC_TYPES] = []
for c in tt:
match c:
case 's':
data, val = parse_string(data)
args.append(val)
case 'b':
data, val = parse_blob(data)
args.append(val)
case 'f':
data, val = parse_float(data)
args.append(val)
case 'i':
data, val = parse_int(data)
args.append(val)
case _:
print(f"[ERROR]: Got {c}")
return args
def encode_args(args: list[OSC_TYPES]) -> bytes:
encoded = b''
for arg in args:
match arg:
case str(): encoded += to_osc_string(arg)
case float(): encoded += struct.pack('>f', arg)
case int(): encoded += struct.pack('>i', arg)
case bytes(): encoded += to_osc_blob(arg)
return encoded
def parse_type(arg: OSC_TYPES) -> str:
match arg:
case str(): return "s"
case float(): return "f"
case int(): return "i"
case bytes(): return "b"
@final
class Message:
def __init__(self, address: str = "/", args: list[OSC_TYPES] = []):
self.address: str = address
self.type_tags = ","
for arg in args:
self.type_tags = self.type_tags + parse_type(arg)
self.args: list[OSC_TYPES] = args
def decode(self, data: bytes):
data, self.address = parse_string(data)
data, self.type_tags = parse_string(data)
self.args = parse_args(self.type_tags, data)
return self
def encode(self) -> bytes:
msg = to_osc_string(self.address)
msg += to_osc_string(self.type_tags)
msg += encode_args(self.args)
return msg
@override
def __str__(self) -> str:
return f"{self.address} [{self.type_tags}]: {self.args}"
@final
class Bundle:
def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []):
self.header = "#bundle"
self.ttag = time_tag
self.msgs = msgs
def decode(self, data: bytes):
data, self.header = parse_string(data)
data[4:]
data, self.ttag = parse_float(data)
data = data[4:]
self.msgs: list[Message] = []
while len(data) > 0:
data, n = parse_int(data)
msg_data = data[:n]
data = data[n:]
self.msgs.append(Message().decode(msg_data))
return self
def encode(self) -> bytes:
bundle = to_osc_string("#bundle")
bundle += struct.pack('4xf4x', self.ttag)
for msg in self.msgs:
msg_data = msg.encode()
bundle += struct.pack('i', len(msg_data))
bundle += msg_data
return bundle
@override
def __str__(self) -> str:
out = f"{self.header} ({self.ttag}):\n"
for msg in self.msgs:
out += f" - {msg}\n"
return out