import struct from typing import final, override OSC_TYPES = str | int | float | bool | bytes def get_string_size(data: bytes) -> int: n = data.find(b'\x00') n = n + 4 - n%4 return n def to_osc_string(s: str) -> bytes: n = len(s) b = struct.pack(f'{n}s{4 - n%4}x', s.encode()) return b def to_osc_blob(b: bytes) -> bytes: n = len(b) b = struct.pack(f'i{4 - n%4}p', n, b) return b def parse_string(data: bytes) -> tuple[bytes, str]: n = get_string_size(data) values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n]) value = values[0].split(b"\x00", 1) return data[n:], value[0].decode() def parse_float(data: bytes) -> tuple[bytes, float]: values: tuple[float] = struct.unpack('>f', data[:4]) return data[4:], values[0] def parse_int(data: bytes) -> tuple[bytes, int]: values: tuple[int] = struct.unpack('>i', data[:4]) return data[4:], values[0] def parse_blob(data: bytes) -> tuple[bytes, bytes]: n_values: tuple[int] = struct.unpack('>i', data[:4]) n: int = n_values[0] values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n]) return data[4+n:], values[0] def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]: tt = tt[1:] args: list[OSC_TYPES] = [] for c in tt: match c: case 's': data, val = parse_string(data) args.append(val) case 'b': data, val = parse_blob(data) args.append(val) case 'f': data, val = parse_float(data) args.append(val) case 'i': data, val = parse_int(data) args.append(val) case _: print(f"[ERROR]: Got {c}") return args def encode_args(args: list[OSC_TYPES]) -> bytes: encoded = b'' for arg in args: match arg: case str(): encoded += to_osc_string(arg) case float(): encoded += struct.pack('>f', arg) case int(): encoded += struct.pack('>i', arg) case bytes(): encoded += to_osc_blob(arg) return encoded def parse_type(arg: OSC_TYPES) -> str: match arg: case str(): return "s" case float(): return "f" case int(): return "i" case bytes(): return "b" @final class Message: def __init__(self, address: str = "/", args: list[OSC_TYPES] = []): self.address: str = address self.type_tags = "," for arg in args: self.type_tags = self.type_tags + parse_type(arg) self.args: list[OSC_TYPES] = args def decode(self, data: bytes): data, self.address = parse_string(data) data, self.type_tags = parse_string(data) self.args = parse_args(self.type_tags, data) return self def encode(self) -> bytes: msg = to_osc_string(self.address) msg += to_osc_string(self.type_tags) msg += encode_args(self.args) return msg @override def __str__(self) -> str: return f"{self.address} [{self.type_tags}]: {self.args}" @final class Bundle: def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []): self.header = "#bundle" self.ttag = time_tag self.msgs = msgs def decode(self, data: bytes): data, self.header = parse_string(data) data[4:] data, self.ttag = parse_float(data) data = data[4:] self.msgs: list[Message] = [] while len(data) > 0: data, n = parse_int(data) msg_data = data[:n] data = data[n:] self.msgs.append(Message().decode(msg_data)) return self def encode(self) -> bytes: bundle = to_osc_string("#bundle") bundle += struct.pack('4xf4x', self.ttag) for msg in self.msgs: msg_data = msg.encode() bundle += struct.pack('i', len(msg_data)) bundle += msg_data return bundle @override def __str__(self) -> str: out = f"{self.header} ({self.ttag}):\n" for msg in self.msgs: out += f" - {msg}\n" return out