diff --git a/examples/log.py b/examples/log.py new file mode 100644 index 0000000..15dc442 --- /dev/null +++ b/examples/log.py @@ -0,0 +1,19 @@ +import socket +from ssl import SOCK_STREAM +from oscpy import * + +UDP_IP = "172.18.0.1" +UDP_PORT = 5005 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sock.bind((UDP_IP, UDP_PORT)) + +while True: + data, addr = sock.recvfrom(10*1024) + if "#bundle" in data[:8].decode(): + b = Bundle(data) + print(b) + else: + m = Message(data) + print(m) + diff --git a/examples/move.py b/examples/move.py new file mode 100644 index 0000000..59336a7 --- /dev/null +++ b/examples/move.py @@ -0,0 +1,45 @@ +import socket +import time +from oscpy import * + + +SEND_IP = "172.18.0.2" +SEND_PORT = 8000 +RECV_IP = "172.18.0.1" +RECV_PORT = 7000 + +pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +msg = Message("/joint_positions", [-1]*6).encode() +n = pub.sendto(msg, (SEND_IP, SEND_PORT)) +if n > 0: + print(f"INFO: Sent {n} bytes") + +time.sleep(10) + +msg = Message("/tcp_coordinates", [-1]*6).encode() +n = pub.sendto(msg, (SEND_IP, SEND_PORT)) +if n > 0: + print(f"INFO: Sent {n} bytes") + +time.sleep(10) + +sub.bind((RECV_IP, RECV_PORT)) +joint_names = [] +found = False +while not found: + data, addr = sub.recvfrom(5*1024) + if "#bundle" in data[:8].decode(): + b = Bundle().decode(data) + for msg in b.msgs: + if "/joint_state/name" in msg.address: + joint_names = msg.args + found = True + +if joint_names: + msg = Message(f"/joint_position/{joint_names[2]}", [ 0 ]) + n = pub.sendto(msg.encode(), (SEND_IP, SEND_PORT)) + if n > 0: + print(f"INFO: Sent {n} bytes") + diff --git a/examples/oscpy/__init__.py b/examples/oscpy/__init__.py new file mode 100644 index 0000000..fb3dc20 --- /dev/null +++ b/examples/oscpy/__init__.py @@ -0,0 +1,138 @@ +import struct +from typing import final, override + +OSC_TYPES = str | int | float | bool | bytes + +def get_string_size(data: bytes) -> int: + n = data.find(b'\x00') + n = n + 4 - n%4 + return n + +def to_osc_string(s: str) -> bytes: + n = len(s) + b = struct.pack(f'{n}s{4 - n%4}x', s.encode()) + return b + +def to_osc_blob(b: bytes) -> bytes: + n = len(b) + b = struct.pack(f'i{4 - n%4}p', n, b) + return b + +def parse_string(data: bytes) -> tuple[bytes, str]: + n = get_string_size(data) + values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n]) + value = values[0].split(b"\x00", 1) + return data[n:], value[0].decode() + +def parse_float(data: bytes) -> tuple[bytes, float]: + values: tuple[float] = struct.unpack('>f', data[:4]) + return data[4:], values[0] + +def parse_int(data: bytes) -> tuple[bytes, int]: + values: tuple[int] = struct.unpack('>i', data[:4]) + return data[4:], values[0] + +def parse_blob(data: bytes) -> tuple[bytes, bytes]: + n_values: tuple[int] = struct.unpack('>i', data[:4]) + n: int = n_values[0] + values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n]) + return data[4+n:], values[0] + +def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]: + tt = tt[1:] + args: list[OSC_TYPES] = [] + for c in tt: + match c: + case 's': + data, val = parse_string(data) + args.append(val) + case 'b': + data, val = parse_blob(data) + args.append(val) + case 'f': + data, val = parse_float(data) + args.append(val) + case 'i': + data, val = parse_int(data) + args.append(val) + case _: + print(f"[ERROR]: Got {c}") + return args + +def encode_args(args: list[OSC_TYPES]) -> bytes: + encoded = b'' + for arg in args: + match arg: + case str(): encoded += to_osc_string(arg) + case float(): encoded += struct.pack('>f', arg) + case int(): encoded += struct.pack('>i', arg) + case bytes(): encoded += to_osc_blob(arg) + return encoded + +def parse_type(arg: OSC_TYPES) -> str: + match arg: + case str(): return "s" + case float(): return "f" + case int(): return "i" + case bytes(): return "b" + +@final +class Message: + def __init__(self, address: str = "/", args: list[OSC_TYPES] = []): + self.address: str = address + self.type_tags = "," + for arg in args: + self.type_tags = self.type_tags + parse_type(arg) + self.args: list[OSC_TYPES] = args + + def decode(self, data: bytes): + data, self.address = parse_string(data) + data, self.type_tags = parse_string(data) + self.args = parse_args(self.type_tags, data) + return self + + def encode(self) -> bytes: + msg = to_osc_string(self.address) + msg += to_osc_string(self.type_tags) + msg += encode_args(self.args) + return msg + + @override + def __str__(self) -> str: + return f"{self.address} [{self.type_tags}]: {self.args}" + +@final +class Bundle: + def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []): + self.header = "#bundle" + self.ttag = time_tag + self.msgs = msgs + + def decode(self, data: bytes): + data, self.header = parse_string(data) + data[4:] + data, self.ttag = parse_float(data) + data = data[4:] + self.msgs: list[Message] = [] + while len(data) > 0: + data, n = parse_int(data) + msg_data = data[:n] + data = data[n:] + self.msgs.append(Message().decode(msg_data)) + return self + + def encode(self) -> bytes: + bundle = to_osc_string("#bundle") + bundle += struct.pack('4xf4x', self.ttag) + for msg in self.msgs: + msg_data = msg.encode() + bundle += struct.pack('i', len(msg_data)) + bundle += msg_data + return bundle + + @override + def __str__(self) -> str: + out = f"{self.header} ({self.ttag}):\n" + for msg in self.msgs: + out += f" - {msg}\n" + return out diff --git a/examples/state.py b/examples/state.py new file mode 100644 index 0000000..d9d5a17 --- /dev/null +++ b/examples/state.py @@ -0,0 +1,27 @@ +import socket +import struct +from oscpy import * + +UDP_IP = "172.18.0.1" +UDP_PORT = 7000 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sock.bind((UDP_IP, UDP_PORT)) + + +while True: + data, addr = sock.recvfrom(10*1024) + n = get_string_size(data) + [tag] = struct.unpack(f'{n}s', data[:n]) + if "#bundle" in tag.decode(): + b = Bundle(data) + print("Bundle:") + for msg in b.msgs: + print(" -", msg) + else: + msg = Message(data) + if msg.address == "/time": + msg.args[0] = float(msg.args[0]) + print(msg) + + diff --git a/examples/test.py b/examples/test.py new file mode 100644 index 0000000..72797f5 --- /dev/null +++ b/examples/test.py @@ -0,0 +1,7 @@ +from oscpy import * + + +encoded = Message("/", ["test", 0.1, 9]).encode() +new_msg = Message().decode(encoded).encode() +assert(encoded == new_msg) + diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 579e157..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc deleted file mode 100644 index 574fd9e..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc deleted file mode 100644 index 068417d..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/interactive_input.py b/workspace/src/osc_ros2/osc_ros2/interactive_input.py new file mode 100644 index 0000000..83cb4d7 --- /dev/null +++ b/workspace/src/osc_ros2/osc_ros2/interactive_input.py @@ -0,0 +1,429 @@ +import os +import time +import toml +import socket +import xml.etree.ElementTree as ET +from typing import Any + +import rclpy +import roboticstoolbox as rtb +from rclpy.node import Node +from sensor_msgs.msg import JointState + + +class JointNameListener(Node): + joint_names: list[str]|None + def __init__(self): + super().__init__('joint_name_listener') + self.subscription = self.create_subscription( + JointState, + '/joint_states', + self.joint_state_callback, + 1 + ) + self.joint_names = None + + def joint_state_callback(self, msg: JointState): + self.joint_names = list(msg.name) + +def interactive_input() -> tuple[dict[str, Any], rtb.robot]: + config = {} + rconf = {} + nconf = {} + robot = None + while True: + use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() + if use_urdf == 'y': + while True: + rconf["urdf"] = input("Enter the absolute path to the URDF file: ") + if os.path.isfile(rconf["urdf"]): + if not rconf["urdf"].endswith('.urdf'): + print("The file is not a URDF file. Please enter a valid URDF file.") + continue + break + else: + print("Invalid path. Please enter a valid path to the URDF file.") + tree = ET.parse(rconf["urdf"]) + root = tree.getroot() + robot = rtb.ERobot.URDF(rconf["urdf"]) + rconf["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] + print(robot) + rconf["joint_velocity_limits"] = {} + # Iterate over all joints in the URDF + for joint in root.findall('.//joint'): + joint_name = joint.get('name') # Get the name of the joint + + # Look for the tag under each joint + limit = joint.find('limit') + + if limit is not None: + # Extract the velocity limit (if it exists) + velocity_limit = limit.get('velocity') + + if velocity_limit is not None: + rconf["joint_velocity_limits"][joint_name] = float(velocity_limit) + + while True: + try: + print('-+'*50) + print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") + print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") + cost_mask_string = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") + rconf["cost_mask"] = [int(i) for i in cost_mask_string] + if sum(rconf["cost_mask"]) <= robot.n and len(rconf["cost_mask"]) == 6: + break + else: + print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.") + except ValueError: + print("Invalid input. Please enter integers only.") + print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if rconf['cost_mask'][i]==1]}") + break + elif use_urdf == 'n': + node = JointNameListener() + print("Wainting 10 sec for JointState messages to extract joint names...") + rclpy.spin_once(node) + counter = 0 + while not(node.joint_names): + if counter > 100: + rconf["joint_names"] = None + break + counter+=1 + time.sleep(0.1) + rclpy.spin_once(node) + rconf["joint_names"] = node.joint_names + node.destroy_node() + if rconf["joint_names"]: + correct_names = input("The following joint names were found:\n" + "\n".join(rconf["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() + while True: + if correct_names == 'y': + break + elif correct_names == 'n': + rconf["joint_names"] = None + break + correct_names = input("Invalid input. Please enter 'y' or 'n'.") + if not(rconf["joint_names"]): + print("Please enter the joint names manually.") + while True: + rconf["joint_names"] = [] + print('-+'*50) + print("Enter the joint names manually one by one. Type 'done' when you are finished:") + print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") + while True: + print('-'*50) + joint_name = input("Enter joint name (or 'done' to finish): ").strip() + if joint_name.lower() == 'done': + break + if joint_name: + rconf["joint_names"].append(joint_name) + print('-+'*50) + correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {rconf['joint_names']}. (y/n)?: ").strip() + if correct_names.lower() == 'y': + break + else: + print("Please re-enter the joint names.") + while True: + try: + rconf["joint_velocity_limits"] = {} + for name in rconf["joint_names"]: + while True: + try: + print('--'*50) + limit = input(f"Enter the velocity limit for joint '{name}': ").strip() + if limit == "": + continue + else: + rconf["joint_velocity_limits"][name] = float(limit) + break + except ValueError: + print("Invalid input. Please enter a numeric value or press Enter to skip.") + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to skip.") + robot = None + rconf["cost_mask"] = None + break + print("Invalid input. Please enter 'y' or 'n'.") + if robot: + while True: + print('+-' * 50) + set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + rconf["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + rconf["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + rconf["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(rconf["x_limits"]) != 2 or len(rconf["y_limits"]) != 2 or len(rconf["z_limits"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (rconf["x_limits"][0] is not None and rconf["x_limits"][1] is not None and rconf["x_limits"][0] >= rconf["x_limits"][1]) or \ + (rconf["y_limits"][0] is not None and rconf["y_limits"][1] is not None and rconf["y_limits"][0] >= rconf["y_limits"][1]) or \ + (rconf["z_limits"][0] is not None and rconf["z_limits"][1] is not None and rconf["z_limits"][0] >= rconf["z_limits"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print("Current limits:") + print(f"x: {rconf['x_limits']}") + print(f"y: {rconf['y_limits']}") + print(f"z: {rconf['z_limits']}") + con = True + while con: + confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: + break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + rconf["x_limits"] = [None, None] + rconf["y_limits"] = [None, None] + rconf["z_limits"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + + while True: + print('+-' * 50) + set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() + if set_limits == 'y': + while True: + try: + rconf["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + rconf["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + rconf["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + + if len(rconf["x_limits_workspace"]) != 2 or len(rconf["y_limits_workspace"]) != 2 or len(rconf["z_limits_workspace"]) != 2: + print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") + continue + + if (rconf["x_limits_workspace"][0] is not None and rconf["x_limits_workspace"][1] is not None and rconf["x_limits_workspace"][0] >= rconf["x_limits_workspace"][1]) or \ + (rconf["y_limits_workspace"][0] is not None and rconf["y_limits_workspace"][1] is not None and rconf["y_limits_workspace"][0] >= rconf["y_limits_workspace"][1]) or \ + (rconf["z_limits_workspace"][0] is not None and rconf["z_limits_workspace"][1] is not None and rconf["z_limits_workspace"][0] >= rconf["z_limits_workspace"][1]): + print("Invalid input. Lower limit must be less than upper limit for each axis.") + continue + + print("Current limits:") + print(f"x: {rconf['x_limits_workspace']}") + print(f"y: {rconf['y_limits_workspace']}") + print(f"z: {rconf['z_limits_workspace']}") + con = True + while con: + confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() + if confirm == 'y': + break + elif confirm == 'n': + print("Please re-enter the limits.") + con = False + else: + print("Invalid input. Please enter 'y' or 'n'.") + if con: + break + except ValueError: + print("Invalid input. Please enter numeric values only.") + break + elif set_limits == 'n': + rconf["x_limits_workspace"] = [None, None] + rconf["y_limits_workspace"] = [None, None] + rconf["z_limits_workspace"] = [None, None] + break + print("Invalid input. Please enter 'y' or 'n'.") + # Ask the user if they want to set new joint limits + while True: + print('+-'*50) + update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() + if update_limits == 'y': + for i in range(len(rconf["joint_names"])): + while True: + try: + lim = robot.qlim.copy() + # Find the link corresponding to the joint name + print("-" * 50) + print(f"Current position limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + lower_limit = input(f"Enter the new lower limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print("Invalid input. Lower limit must be less than upper limit.") + continue + lower_limit = float(lower_limit) if lower_limit!="" else None + upper_limit = float(upper_limit) if upper_limit!="" else None + if lower_limit is not None: + if lower_limitlim[1][i]: + while True: + sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() + if sure == 'y': + lim[1][i] = float(upper_limit) + break + elif sure == 'n': + print("Upper limit not changed.") + break + print("Invalid input. Please enter 'y' or 'n'.") + else: + lim[1][i] = float(upper_limit) + robot.qlim = lim + print(f"New limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + print("-" * 50) + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + break + if update_limits == 'n': + break + print("Invalid input. Please enter 'y' or 'n'.") + ''' + use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() + if use_link_mask == 'y': + while True: + try: + ''' + else: + while True: + print('+-' * 50) + update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() + n_joints = len(rconf["joint_names"]) + if update_limits == 'y': + rconf["joint_lim"] = [[]*n_joints,[]*n_joints] + for i, joint in enumerate(rconf["joint_names"]): + while True: + try: + print("-" * 50) + lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() + + if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): + print('--' * 50) + print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") + continue + rconf["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None + rconf["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None + break + except ValueError: + print("Invalid input. Please enter numeric values or leave blank to keep current limits.") + print(f'New limits for joint:\n lower: {rconf["joint_lim"][0]}\n upper: {rconf["joint_lim"][1]}') + break + elif update_limits == 'n': + rconf["joint_lim"] = None + break + print("Invalid input. Please enter 'y' or 'n'.") + while True: + try: + print('+-' * 50) + rconf["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() + if rconf["trajectory_topic_name"] == "": + rconf["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' + break + elif rconf["trajectory_topic_name"].startswith("/"): + break + else: + print("Invalid topic name. A valid topic name should start with '/'.") + except Exception as e: + print(f"An error occurred: {e}") + print('--' * 50) + while True: + try: + print('+-' * 50) + rconf["hz"] = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ") + if rconf["hz"] == "": + rconf["hz"] = 100 + else: + rconf["hz"] = float(rconf["hz"]) + break + except ValueError: + print("Invalid input. Please enter a valid number.") + continue + while True: + try: + print('+-' * 50) + nconf["log_ip"] = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): ")) + if nconf["log_ip"] == "": + nconf["log_ip"] = "127.0.0.1" + print('--' * 50) + nconf["log_port"] = input("Enter the target port for the log messages (or press Enter for default: 5005): ") + if nconf["log_port"] == "": + nconf["log_port"] = 5005 + else: + nconf["log_port"] = int(nconf["log_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid IP address.") + continue + while True: + try: + print('+-' * 50) + nconf["state_ip"] = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): ")) + if nconf["state_ip"] == "": + nconf["state_ip"] = "127.0.0.1" + print('--' * 50) + nconf["state_port"] = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ") + if nconf["state_port"] == "": + nconf["state_port"] = 7000 + else: + nconf["state_port"] = int(nconf["state_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid IP address.") + continue + while True: + try: + print('+-' * 50) + nconf["commands_port"] = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ") + if nconf["commands_port"] == nconf["state_port"]: + print("The commands port must be different from the state port.") + continue + if nconf["commands_port"] == "": + nconf["commands_port"] = 8000 + else: + nconf["commands_port"] = int(nconf["commands_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid port.") + continue + print() + print('=-=' * 50) + print() + print(f"Sending joint state's' to {nconf['state_ip']}:{nconf['state_port']}") + print() + print('=-=' * 50) + print() + print(f"Sending log messages to {nconf['log_ip']}:{nconf['log_port']}") + print() + print('=-=' * 50) + print() + print(f"Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{nconf['commands_port']}") + print() + print('=-=' * 50) + print() + config["robot"] = rconf + config["network"] = nconf + while True: + save_config = input("Do you want to save this interface? (y/n): ").strip().lower() + if save_config == "y": + config_path = input("Where do you want to save the config?: ").strip() + with open(f"{config_path}.toml", "w") as f: + toml.dump(config, f) + break + elif save_config == "n": + print("Will not be saving config") + print("Invalid input. Please enter 'y' or 'n'.") + return config, robot diff --git a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py index 0b28d5b..e854cf7 100644 --- a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py +++ b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py @@ -1,60 +1,59 @@ +#ruff: noqa: F403, F405 import rclpy +import argparse +import toml from rclpy.node import Node from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from sensor_msgs.msg import JointState from rcl_interfaces.msg import Log from osc4py3.as_allthreads import * from osc4py3 import oscmethod as osm -import xml.etree.ElementTree as ET import numpy as np import spatialmath as sm import roboticstoolbox as rtb from osc4py3 import oscbuildparse +from osc_ros2.interactive_input import interactive_input import time -import os import re import socket -class JointNameListener(Node): - def __init__(self): - super().__init__('joint_name_listener') - self.subscription = self.create_subscription( - JointState, - '/joint_states', - self.joint_state_callback, - 1 - ) - self.joint_names = None - - def joint_state_callback(self, msg: JointState): - self.joint_names = list(msg.name) class OSC_ROS2_interface(Node): """Node to publish joint trajectories based on OSC messages.""" - def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask): + def __init__(self, robot, config): super().__init__('scaled_joint_trajectory_publisher') - - self.subscription = self.create_subscription( JointState, '/joint_states', self.joint_states_callback, 1 ) - self.subscription = self.create_subscription( Log, '/rosout', self.log_callback, 100 ) - # Store received joint positions self.current_joint_positions = None - self.joint_names = joint_names - self.joint_velocity_limits = joint_velocity_limits - self.cost_mask = cost_mask + self.joint_names = config["robot"]["joint_names"] + self.joint_velocity_limits = config["robot"]["joint_velocity_limits"] + self.cost_mask = config["robot"]["cost_mask"] + self.trajectory_topic_name = config["robot"]["trajectory_topic_name"] + self.hz = config["robot"]["hz"] + self.state_ip = config["network"]["state_ip"] + self.state_port = config["network"]["state_port"] + self.log_ip = config["network"]["log_ip"] + self.log_port = config["network"]["log_port"] + self.commands_port = config["network"]["commands_port"] + float_or_none = lambda v: None if "None" in v else float(v) + self.x_limits = list(map(float_or_none, config["robot"]["x_limits"])) + self.y_limits = list(map(float_or_none, config["robot"]["y_limits"])) + self.z_limits = list(map(float_or_none, config["robot"]["z_limits"])) + self.x_limits_workspace = list(map(float_or_none, config["robot"]["x_limits_workspace"])) + self.y_limits_workspace = list(map(float_or_none, config["robot"]["y_limits_workspace"])) + self.z_limits_workspace = list(map(float_or_none, config["robot"]["z_limits_workspace"])) self.robot = robot self.desired = None self.previous_desired = None @@ -67,269 +66,17 @@ class OSC_ROS2_interface(Node): } self.speed_scaling = 0.2 self.new = False - self.n_joints = len(joint_names) - - if robot: - while True: - print('+-' * 50) - set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \ - (self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \ - (self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {self.x_limits}") - print(f"y: {self.y_limits}") - print(f"z: {self.z_limits}") - con = True - while con: - confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - self.x_limits = [None, None] - self.y_limits = [None, None] - self.z_limits = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - - while True: - print('+-' * 50) - set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower() - if set_limits == 'y': - while True: - try: - self.x_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - self.y_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - self.z_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - - if len(self.x_limits_workspace) != 2 or len(self.y_limits_workspace) != 2 or len(self.z_limits_workspace) != 2: - print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") - continue - - if (self.x_limits_workspace[0] is not None and self.x_limits_workspace[1] is not None and self.x_limits_workspace[0] >= self.x_limits_workspace[1]) or \ - (self.y_limits_workspace[0] is not None and self.y_limits_workspace[1] is not None and self.y_limits_workspace[0] >= self.y_limits_workspace[1]) or \ - (self.z_limits_workspace[0] is not None and self.z_limits_workspace[1] is not None and self.z_limits_workspace[0] >= self.z_limits_workspace[1]): - print("Invalid input. Lower limit must be less than upper limit for each axis.") - continue - - print(f"Current limits:") - print(f"x: {self.x_limits_workspace}") - print(f"y: {self.y_limits_workspace}") - print(f"z: {self.z_limits_workspace}") - con = True - while con: - confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() - if confirm == 'y': - break - elif confirm == 'n': - print("Please re-enter the limits.") - con = False - else: - print("Invalid input. Please enter 'y' or 'n'.") - if con: break - except ValueError: - print("Invalid input. Please enter numeric values only.") - break - elif set_limits == 'n': - self.x_limits_workspace = [None, None] - self.y_limits_workspace = [None, None] - self.z_limits_workspace = [None, None] - break - print("Invalid input. Please enter 'y' or 'n'.") - # Ask the user if they want to set new joint limits - - # Ask the user if they want to set new joint limits - while True: - print('+-'*50) - update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() - if update_limits == 'y': - for i in range(len(self.joint_names)): - while True: - try: - lim = self.robot.qlim.copy() - # Find the link corresponding to the joint name - print("-" * 50) - print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") - lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print("Invalid input. Lower limit must be less than upper limit.") - continue - lower_limit = float(lower_limit) if lower_limit!="" else None - upper_limit = float(upper_limit) if upper_limit!="" else None - if lower_limit!=None: - if lower_limitlim[1][i]: - while True: - sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower() - if sure == 'y': - lim[1][i] = float(upper_limit) - break - elif sure == 'n': - print("Upper limit not changed.") - break - print("Invalid input. Please enter 'y' or 'n'.") - else: lim[1][i] = float(upper_limit) - self.robot.qlim = lim - print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad") - print("-" * 50) - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - break - if update_limits == 'n': - break - print("Invalid input. Please enter 'y' or 'n'.") - ''' - use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower() - if use_link_mask == 'y': - while True: - try: - ''' - else: - while True: - print('+-' * 50) - update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() - if update_limits == 'y': - self.joint_lim = [[None]*self.n_joints,[None]*self.n_joints] - for i, joint in enumerate(self.joint_names): - while True: - try: - print("-" * 50) - lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip() - - if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): - print('--' * 50) - print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") - continue - self.joint_lim[0][i] = float(lower_limit) if lower_limit!="" else None - self.joint_lim[1][i] = float(upper_limit) if upper_limit!="" else None - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - print(f'New limits for joint:\n lower: {self.joint_lim[0]}\n upper: {self.joint_lim[1]}') - break - elif update_limits == 'n': - self.joint_lim = None - break - print("Invalid input. Please enter 'y' or 'n'.") - - - - while True: - try: - print('+-' * 50) - self.trajectory_topic_name = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() - if self.trajectory_topic_name == "": - self.trajectory_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory' - break - elif self.trajectory_topic_name.startswith("/"): - break - else: - print("Invalid topic name. A valid topic name should start with '/'.") - except Exception as e: - print(f"An error occurred: {e}") - + self.n_joints = len(self.joint_names) # ROS2 Publisher self.publisher = self.create_publisher( JointTrajectory, self.trajectory_topic_name, 1 ) - - while True: - try: - print('+-' * 50) - log_ip = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): ")) - if log_ip == "": - log_ip = "127.0.0.1" - print('--' * 50) - log_port = input("Enter the target port for the log messages (or press Enter for default: 5005): ") - if log_port == "": - log_port = 5005 - else: - log_port = int(log_port) - break - except ValueError: - print("Invalid input. Please enter a valid IP address.") - continue - while True: - try: - print('+-' * 50) - state_ip = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): ")) - if state_ip == "": - state_ip = "127.0.0.1" - print('--' * 50) - state_port = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ") - if state_port == "": - state_port = 7000 - else: - state_port = int(state_port) - break - except ValueError: - print("Invalid input. Please enter a valid IP address.") - continue - while True: - try: - print('+-' * 50) - commands_port = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ") - if commands_port == state_port: - print("The commands port must be different from the state port.") - continue - if commands_port == "": - commands_port = 8000 - else: - commands_port = int(commands_port) - break - except ValueError: - print("Invalid input. Please enter a valid port.") - continue - - osc_startup() - - osc_udp_client(state_ip, state_port, "osc_client") - - osc_udp_client(log_ip, log_port, "osc_log_client") - - osc_udp_server('0.0.0.0', commands_port, "osc_server") - + osc_udp_client(self.state_ip, self.state_port, "osc_client") + osc_udp_client(self.log_ip, self.log_port, "osc_log_client") + osc_udp_server('0.0.0.0', self.commands_port, "osc_server") # Register OSC handler osc_method("/joint_positions", self.joint_positions_handler, argscheme=osm.OSCARG_DATAUNPACK) osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_ADDRESS+osm.OSCARG_DATAUNPACK) @@ -337,39 +84,10 @@ class OSC_ROS2_interface(Node): osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK) osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK) osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK) - print('--' * 50) - while True: - try: - print('+-' * 50) - self.hz = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ") - if self.hz == "": - self.hz = 100 - else: - self.hz = float(self.hz) - break - except ValueError: - print("Invalid input. Please enter a valid number.") - continue - print() - print('=-=' * 50) - print() - print(f'Sending joint states to {state_ip}:{state_port}') - print() - print('=-=' * 50) - print() - print(f'Sending log messages to {log_ip}:{log_port}') - print() - print('=-=' * 50) - print() - print(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}') - print() - print('=-=' * 50) - print() - self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}") - self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}') - self.get_logger().info(f'Sending joint states to {state_ip}:{state_port}') - self.get_logger().info(f'Sending log messages to {log_ip}:{log_port}') + self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{self.commands_port}') + self.get_logger().info(f'Sending joint states to {self.state_ip}:{self.state_port}') + self.get_logger().info(f'Sending log messages to {self.log_ip}:{self.log_port}') self.create_timer(1/self.hz, self.update_position) # Timer to update the position self.create_timer(3, self.reset_prev) # reset the previous desired position @@ -393,17 +111,16 @@ class OSC_ROS2_interface(Node): def joint_trajectory_handler(self, *args): try: - if len(args[0]) == 6: - points = [[float(j) for j in i] for i in args] - elif len(args[0]) >= 7: - points = [[float(j) for j in i[:6]] for i in args] - self.get_logger().warn(f"joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.") - else: - self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.") - return - - self.desired = ["joint_trajectory"] + points - self.new = True + if len(args[0]) == 6: + points = [[float(j) for j in i] for i in args] + elif len(args[0]) >= 7: + points = [[float(j) for j in i[:6]] for i in args] + self.get_logger().warn("joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.") + else: + self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.") + return + self.desired = ["joint_trajectory"] + points + self.new = True except Exception as e: self.get_logger().fatal(f"joint_trajectory_handler: {e}") @@ -411,7 +128,6 @@ class OSC_ROS2_interface(Node): """Handles incoming OSC messages for joint positions.""" try: joint_name = address.split("/")[-1] - if joint_name in self.joint_names: if len(args) == 1: position = float(args[0]) @@ -478,7 +194,6 @@ class OSC_ROS2_interface(Node): self.get_logger().warn(f"joint_position_handler: Joint '{joint_name}' not found in the robot model.") except Exception as e: self.get_logger().fatal(f"joint_position_handler: {e}") - def cartesian_trajectory_handler(self, *args): """Handles incoming OSC messages for cartesian trajectory.""" @@ -488,11 +203,10 @@ class OSC_ROS2_interface(Node): points = [[float(j) for j in i] for i in args] elif len(args[0]) >= 7: points = [[float(j) for j in i[:6]] for i in args] - self.get_logger().warn(f"cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.") + self.get_logger().warn("cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.") else: - self.get_logger().warn(f"cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.") + self.get_logger().warn("cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.") return - self.desired = ["cartesian_trajectory"] + points self.new = True except Exception as e: @@ -511,7 +225,6 @@ class OSC_ROS2_interface(Node): else: self.get_logger().warn(f"joint_positions_handler: Invalid number of arguments for joint positions. Expected {len(self.joint_names)} ([q0, q1, q2, ... q{len(self.joint_names)}]) or {len(self.joint_names)+1} ([q0, q1, q2, ... q{len(self.joint_names)}, duration]), but got {len(args)}.") return - # Check if joint positions exceed limits if self.robot: for i, position in enumerate(desired_joint_positions[:len(self.joint_names)]): # Exclude duration if present @@ -540,7 +253,6 @@ class OSC_ROS2_interface(Node): self.get_logger().warn( f"joint_positions_handler: Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.joint_lim[1][i]}." ) - self.desired = ["joint_positions"] + desired_joint_positions self.new = True except Exception as e: @@ -559,7 +271,6 @@ class OSC_ROS2_interface(Node): else: self.get_logger().warn(f"tcp_coordinates_handler: Invalid number of arguments for TCP coordinates. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args)}.") return - if self.x_limits[0] is not None: x = max(self.x_limits[0], x) if self.x_limits[1] is not None: @@ -572,13 +283,11 @@ class OSC_ROS2_interface(Node): z = max(self.z_limits[0], z) if self.z_limits[1] is not None: z = min(self.z_limits[1], z) - if x != args[0] or y != args[1] or z != args[2]: self.get_logger().warn( f"tcp_coordinates_handler: Desired joint positions adjusted to fit within limits: " f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})" ) - self.desired = ["tcp_coordinates", x, y, z, r, p, yaw, duration] self.new = True except Exception as e: @@ -587,23 +296,21 @@ class OSC_ROS2_interface(Node): self.get_logger().warn("tcp_coordinates_handler: No robot model provided. Cannot handle TCP coordinates.") return - def joint_states_callback(self, msg: JointState): """Callback function to handle incoming joint states.""" try: - msg_time = oscbuildparse.OSCMessage(f"/time", ',s', [str(time.time())]) + msg_time = oscbuildparse.OSCMessage("/time", ',s', [str(time.time())]) osc_send(msg_time, "osc_client") - if not(self.joint_names): self.joint_names = msg.name + if not(self.joint_names): + self.joint_names = msg.name joint_position_dict = dict(zip(msg.name, msg.position)) self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names] joint_position_dict = dict(zip(msg.name, msg.velocity)) self.current_joint_velocities = [float(joint_position_dict[name]) for name in self.joint_names] - if self.robot: tcp_position = self.robot.fkine(self.current_joint_positions).t tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy() - - msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]]) + msg_tcp = oscbuildparse.OSCMessage("/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]]) #msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]]) #msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]]) #msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]]) @@ -613,21 +320,18 @@ class OSC_ROS2_interface(Node): #bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw]) #osc_send(bun, "osc_client") osc_send(msg_tcp, "osc_client") - - msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position]) - msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity]) - msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort]) - msg_name = oscbuildparse.OSCMessage(f"/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name]) + msg_position = oscbuildparse.OSCMessage("/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position]) + msg_velocity = oscbuildparse.OSCMessage("/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity]) + msg_effort = oscbuildparse.OSCMessage("/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort]) + msg_name = oscbuildparse.OSCMessage("/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name]) bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort]) osc_send(bun, "osc_client") - #for i, name in enumerate(msg.name): # msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]]) # msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]]) # msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]]) # bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort]) # osc_send(bun, "osc_client") - except Exception as e: self.get_logger().fatal(f"joint_states_callback: {e}") @@ -672,21 +376,17 @@ class OSC_ROS2_interface(Node): return def trapezoidal_timestamps(self, num_points,total_duration, flat_ratio = 0.3): - if num_points == 2: return [0, total_duration] n = int(num_points*(1-flat_ratio)/2) start = np.cos(np.linspace(0, np.pi, n))+2 end = np.cos(np.linspace(-np.pi, 0, n))+2 flat = np.ones(num_points-2*n) - timestamps = np.concatenate((start, flat, end)) timestamps *= total_duration / timestamps.sum() timestamps = np.cumsum(timestamps) - return timestamps.tolist() - def send_tcp_coordinates(self): """Send the desired TCP coordinates to the robot.""" try: @@ -694,7 +394,7 @@ class OSC_ROS2_interface(Node): msg = JointTrajectory() msg.joint_names = self.joint_names steps_per_m = 100 - if self.previous_desired == None: + if self.previous_desired is None: [x,y,z] = self.robot.fkine(self.current_joint_positions).t [roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy() else: @@ -703,7 +403,8 @@ class OSC_ROS2_interface(Node): x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7] self.previous_desired = self.desired steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m) - if steps < 2: steps = 2 + if steps < 2: + steps = 2 cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)] '''if self.previous_desired: [x,y,z] = self.previous_desired[1:4] @@ -746,16 +447,13 @@ class OSC_ROS2_interface(Node): # Compose SE3 transform cart_traj.append(sm.SE3(pos_interp) * q_interp.SE3())''' - - - if self.desired[-1]: timestamps = self.trapezoidal_timestamps(steps, self.desired[-1], 0.8) for j in range(steps): sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') if sol[1] == 1: fowards = self.robot.fkine_all(sol[0]) - out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) + out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False) if np.any(out_of_bounds): #print(fowards.t) #indices = np.where(out_of_bounds)[0] @@ -790,9 +488,6 @@ class OSC_ROS2_interface(Node): msg.header.stamp = self.get_clock().now().to_msg() self.publisher.publish(msg) self.previous_desired = self.desired - - - else: prev_duration = 0 ''' @@ -807,7 +502,7 @@ class OSC_ROS2_interface(Node): sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan') if sol[1] == 1: fowards = self.robot.fkine_all(sol[0]) - out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False) + out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False) if np.any(out_of_bounds): #print(fowards.t) #indices = np.where(out_of_bounds)[0] @@ -852,7 +547,6 @@ class OSC_ROS2_interface(Node): self.get_logger().fatal(f"send_tcp_coordinates: {e}") def send_joint_trajectory(self): - try: self.new = False viapoints = np.array([i for i in self.desired[1:]]) @@ -873,7 +567,6 @@ class OSC_ROS2_interface(Node): print(f'Error in joint_angles_handler: {e}') def send_cartesian_trajectory(self): - try: self.new = False viapoints = np.array([i[:6] for i in self.desired[1:]]) @@ -898,14 +591,14 @@ class OSC_ROS2_interface(Node): point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9) msg.points.append(point) prev_sol = list(sol[0]) - else: self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!") + else: + self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!") msg.header.stamp = self.get_clock().now().to_msg() msg.points = msg.points[::n] self.publisher.publish(msg) self.previous_desired = None except Exception as e: print(f'Error in joint_angles_handler: {e}') - self.previous_desired = None def update_position(self): @@ -913,7 +606,6 @@ class OSC_ROS2_interface(Node): try: if self.desired is None or not(self.new): return - if self.desired[0] == "joint_positions": self.new = False self.send_joint_positions() @@ -933,35 +625,26 @@ class OSC_ROS2_interface(Node): else: self.get_logger().warn(f"update_position: Unknown desired type '{self.desired[0]}'.") return - - except Exception as e: self.get_logger().fatal(f'update_position: {e}') def clean_log_string(self, s): - s = str(s) - # Remove ANSI escape sequences (e.g., \x1b[31m) ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') s = ansi_escape.sub('', s) - # Replace tabs/newlines with spaces s = s.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ').replace("'", ' '). replace('"', ' ').replace('`', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ') - # Strip leading/trailing whitespace s = s.strip() - # Optionally enforce ASCII only (replace non-ASCII chars with '?') s = s.encode('ascii', 'replace').decode('ascii') - return s def log_callback(self, msg: Log): """Callback function to handle incoming log messages.""" - # Send the log message as an OSC message msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)]) osc_send(msg_log, "osc_log_client") @@ -969,122 +652,16 @@ class OSC_ROS2_interface(Node): def main(): """Main function to get joint names and start the ROS 2 & OSC system.""" + parser = argparse.ArgumentParser(description="An interface between ROS2 and OSC for robotic arm manipulators", usage="ros2 run osc_ros2 interface [-c CONFIG.TOML]") + parser.add_argument("-c", "--config", type=argparse.FileType("r"), help="A robot TOML config, if the config is provided then the interface will not show the iteractive input") + args = parser.parse_args() rclpy.init() - while True: - use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() - if use_urdf == 'y': - while True: - robot_urdf = input("Enter the absolute path to the URDF file: ") - if os.path.isfile(robot_urdf): - if not robot_urdf.endswith('.urdf'): - print("The file is not a URDF file. Please enter a valid URDF file.") - continue - break - else: - print("Invalid path. Please enter a valid path to the URDF file.") - tree = ET.parse(robot_urdf) - root = tree.getroot() - robot = rtb.ERobot.URDF(robot_urdf) - joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] - print(robot) - joint_velocity_limits = {} - - # Iterate over all joints in the URDF - for joint in root.findall('.//joint'): - joint_name = joint.get('name') # Get the name of the joint - - # Look for the tag under each joint - limit = joint.find('limit') - - if limit is not None: - # Extract the velocity limit (if it exists) - velocity_limit = limit.get('velocity') - - if velocity_limit is not None: - joint_velocity_limits[joint_name] = float(velocity_limit) - - while True: - try: - print('-+'*50) - print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") - print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") - cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")] - if sum(cost_mask) <= robot.n and len(cost_mask) == 6: - break - else: - print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.") - except ValueError: - print("Invalid input. Please enter integers only.") - print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}") - break - elif use_urdf == 'n': - node = JointNameListener() - print("Wainting 10 sec for JointState messages to extract joint names...") - rclpy.spin_once(node) - counter = 0 - while not(node.joint_names): - if counter > 100: - joint_names = None - break - counter+=1 - time.sleep(0.1) - rclpy.spin_once(node) - joint_names = node.joint_names - node.destroy_node() - if joint_names: - correct_names = input("The following joint names were found:\n" + "\n".join(joint_names) + "\nAre these correct? (y/n): ").strip().lower() - while True: - if correct_names == 'y': - break - elif correct_names == 'n': - joint_names = None - break - correct_names = input("Invalid input. Please enter 'y' or 'n'.") - if not(joint_names): - print("Please enter the joint names manually.") - while True: - joint_names = [] - print('-+'*50) - print("Enter the joint names manually one by one. Type 'done' when you are finished:") - print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") - while True: - print('-'*50) - joint_name = input("Enter joint name (or 'done' to finish): ").strip() - if joint_name.lower() == 'done': - break - if joint_name: - joint_names.append(joint_name) - print('-+'*50) - correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {joint_names}. (y/n)?: ").strip() - if correct_names.lower() == 'y': - break - else: - print("Please re-enter the joint names.") - while True: - try: - joint_velocity_limits = {} - for name in joint_names: - while True: - try: - print('--'*50) - limit = input(f"Enter the velocity limit for joint '{name}': ").strip() - if limit == "": - continue - else: - joint_velocity_limits[name] = float(limit) - break - except ValueError: - print("Invalid input. Please enter a numeric value or press Enter to skip.") - break - except ValueError: - print("Invalid input. Please enter numeric values or leave blank to skip.") - robot = None - cost_mask = None - break - print("Invalid input. Please enter 'y' or 'n'.") - - - node = OSC_ROS2_interface(joint_names, joint_velocity_limits, robot, cost_mask) + if not args.config: + config, robot = interactive_input() + else: + config = toml.load(args.config) + robot = rtb.ERobot.URDF(config["robot"]["urdf"]) + node = OSC_ROS2_interface(robot, config) # Run ROS 2 spin, and osc_process will be handled by the timer try: diff --git a/workspace/ur10e.toml b/workspace/ur10e.toml new file mode 100644 index 0000000..e345f8a --- /dev/null +++ b/workspace/ur10e.toml @@ -0,0 +1,27 @@ +[robot] +urdf = "/workspace/ur10e.urdf" +joint_names = [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",] +cost_mask_string = "111111" +x_limits = [ "None", "None",] +y_limits = [ "None", "None",] +z_limits = [ "None", "None",] +x_limits_workspace = [ "None", "None",] +y_limits_workspace = [ "None", "None",] +z_limits_workspace = [ "None", "None",] +trajectory_topic_name = "/scaled_joint_trajectory_controller/joint_trajectory" +hz = 100 + +[network] +log_ip = "127.0.0.1" +log_port = 5005 +state_ip = "127.0.0.1" +state_port = 7000 +commands_port = 8000 + +[robot.joint_velocity_limits] +shoulder_pan_joint = 2.0943951023931953 +shoulder_lift_joint = 2.0943951023931953 +elbow_joint = 3.141592653589793 +wrist_1_joint = 3.141592653589793 +wrist_2_joint = 3.141592653589793 +wrist_3_joint = 3.141592653589793