diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 579e157..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc deleted file mode 100644 index 574fd9e..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc b/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc deleted file mode 100644 index 068417d..0000000 Binary files a/workspace/src/osc_ros2/osc_ros2/__pycache__/osc_ros2_unit_quater.cpython-310.pyc and /dev/null differ diff --git a/workspace/src/osc_ros2/osc_ros2/interactive_input.py b/workspace/src/osc_ros2/osc_ros2/interactive_input.py index ca0b421..d32e3b0 100644 --- a/workspace/src/osc_ros2/osc_ros2/interactive_input.py +++ b/workspace/src/osc_ros2/osc_ros2/interactive_input.py @@ -1,11 +1,13 @@ import os import time +import toml +import socket import xml.etree.ElementTree as ET from typing import Any import rclpy import roboticstoolbox as rtb -from rclpy import Node +from rclpy.node import Node from sensor_msgs.msg import JointState @@ -24,27 +26,29 @@ class JointNameListener(Node): def joint_state_callback(self, msg: JointState): self.joint_names = list(msg.name) -def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: +def interactive_input() -> tuple[dict[str, Any], rtb.robot]: config = {} + rconf = {} + nconf = {} robot = None while True: use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower() if use_urdf == 'y': while True: - config["urdf"] = input("Enter the absolute path to the URDF file: ") - if os.path.isfile(config["urdf"]): - if not config["urdf"].endswith('.urdf'): + rconf["urdf"] = input("Enter the absolute path to the URDF file: ") + if os.path.isfile(rconf["urdf"]): + if not rconf["urdf"].endswith('.urdf'): print("The file is not a URDF file. Please enter a valid URDF file.") continue break else: print("Invalid path. Please enter a valid path to the URDF file.") - tree = ET.parse(config["robot_urdf"]) + tree = ET.parse(rconf["urdf"]) root = tree.getroot() - robot = rtb.ERobot.URDF(config["urdf"]) - config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] + robot = rtb.ERobot.URDF(rconf["urdf"]) + rconf["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic'] print(robot) - config["joint_velocity_limits"] = {} + rconf["joint_velocity_limits"] = {} # Iterate over all joints in the URDF for joint in root.findall('.//joint'): joint_name = joint.get('name') # Get the name of the joint @@ -57,15 +61,15 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: velocity_limit = limit.get('velocity') if velocity_limit is not None: - config["joint_velocity_limits"][joint_name] = float(velocity_limit) + rconf["joint_velocity_limits"][joint_name] = float(velocity_limit) while True: try: print('-+'*50) print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].") print("The cost mask 111000 means that the IK will only consider translation and no rotaion.") - config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") - cost_mask = [int(i) for i in config["cost_mask_string"]] + rconf["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ") + cost_mask = [int(i) for i in rconf["cost_mask_string"]] if sum(cost_mask) <= robot.n and len(cost_mask) == 6: break else: @@ -81,26 +85,26 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: counter = 0 while not(node.joint_names): if counter > 100: - config["joint_names"] = None + rconf["joint_names"] = None break counter+=1 time.sleep(0.1) rclpy.spin_once(node) - config["joint_names"] = node.joint_names + rconf["joint_names"] = node.joint_names node.destroy_node() - if config["joint_names"]: - correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() + if rconf["joint_names"]: + correct_names = input("The following joint names were found:\n" + "\n".join(rconf["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower() while True: if correct_names == 'y': break elif correct_names == 'n': - config["joint_names"] = None + rconf["joint_names"] = None break correct_names = input("Invalid input. Please enter 'y' or 'n'.") - if not(config["joint_names"]): + if not(rconf["joint_names"]): print("Please enter the joint names manually.") while True: - config["joint_names"] = [] + rconf["joint_names"] = [] print('-+'*50) print("Enter the joint names manually one by one. Type 'done' when you are finished:") print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.") @@ -110,17 +114,17 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: if joint_name.lower() == 'done': break if joint_name: - config["joint_names"].append(joint_name) + rconf["joint_names"].append(joint_name) print('-+'*50) - correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config['joint_names']}. (y/n)?: ").strip() + correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {rconf['joint_names']}. (y/n)?: ").strip() if correct_names.lower() == 'y': break else: print("Please re-enter the joint names.") while True: try: - config["joint_velocity_limits"] = {} - for name in config["joint_names"]: + rconf["joint_velocity_limits"] = {} + for name in rconf["joint_names"]: while True: try: print('--'*50) @@ -128,7 +132,7 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: if limit == "": continue else: - config["joint_velocity_limits"][name] = float(limit) + rconf["joint_velocity_limits"][name] = float(limit) break except ValueError: print("Invalid input. Please enter a numeric value or press Enter to skip.") @@ -146,24 +150,24 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: if set_limits == 'y': while True: try: - config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + rconf["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + rconf["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + rconf["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2: + if len(rconf["x_limits"]) != 2 or len(rconf["y_limits"]) != 2 or len(rconf["z_limits"]) != 2: print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") continue - if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \ - (config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \ - (config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]): + if (rconf["x_limits"][0] is not None and rconf["x_limits"][1] is not None and rconf["x_limits"][0] >= rconf["x_limits"][1]) or \ + (rconf["y_limits"][0] is not None and rconf["y_limits"][1] is not None and rconf["y_limits"][0] >= rconf["y_limits"][1]) or \ + (rconf["z_limits"][0] is not None and rconf["z_limits"][1] is not None and rconf["z_limits"][0] >= rconf["z_limits"][1]): print("Invalid input. Lower limit must be less than upper limit for each axis.") continue print("Current limits:") - print(f"x: {config['x_limits']}") - print(f"y: {config['y_limits']}") - print(f"z: {config['z_limits']}") + print(f"x: {rconf['x_limits']}") + print(f"y: {rconf['y_limits']}") + print(f"z: {rconf['z_limits']}") con = True while con: confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower() @@ -180,9 +184,9 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: print("Invalid input. Please enter numeric values only.") break elif set_limits == 'n': - config["x_limits"] = [None, None] - config["y_limits"] = [None, None] - config["z_limits"] = [None, None] + rconf["x_limits"] = [None, None] + rconf["y_limits"] = [None, None] + rconf["z_limits"] = [None, None] break print("Invalid input. Please enter 'y' or 'n'.") @@ -192,24 +196,24 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: if set_limits == 'y': while True: try: - config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] - config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] - config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] + rconf["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()] + rconf["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()] + rconf["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()] - if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2: + if len(rconf["x_limits_workspace"]) != 2 or len(rconf["y_limits_workspace"]) != 2 or len(rconf["z_limits_workspace"]) != 2: print("Invalid input. Please enter exactly two values (or leave blank) for each limit.") continue - if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \ - (config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \ - (config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]): + if (rconf["x_limits_workspace"][0] is not None and rconf["x_limits_workspace"][1] is not None and rconf["x_limits_workspace"][0] >= rconf["x_limits_workspace"][1]) or \ + (rconf["y_limits_workspace"][0] is not None and rconf["y_limits_workspace"][1] is not None and rconf["y_limits_workspace"][0] >= rconf["y_limits_workspace"][1]) or \ + (rconf["z_limits_workspace"][0] is not None and rconf["z_limits_workspace"][1] is not None and rconf["z_limits_workspace"][0] >= rconf["z_limits_workspace"][1]): print("Invalid input. Lower limit must be less than upper limit for each axis.") continue print("Current limits:") - print(f"x: {config['x_limits_workspace']}") - print(f"y: {config['y_limits_workspace']}") - print(f"z: {config['z_limits_workspace']}") + print(f"x: {rconf['x_limits_workspace']}") + print(f"y: {rconf['y_limits_workspace']}") + print(f"z: {rconf['z_limits_workspace']}") con = True while con: confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower() @@ -226,9 +230,9 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: print("Invalid input. Please enter numeric values only.") break elif set_limits == 'n': - config["x_limits_workspace"] = [None, None] - config["y_limits_workspace"] = [None, None] - config["z_limits_workspace"] = [None, None] + rconf["x_limits_workspace"] = [None, None] + rconf["y_limits_workspace"] = [None, None] + rconf["z_limits_workspace"] = [None, None] break print("Invalid input. Please enter 'y' or 'n'.") # Ask the user if they want to set new joint limits @@ -236,15 +240,15 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: print('+-'*50) update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower() if update_limits == 'y': - for i in range(len(config["joint_names"])): + for i in range(len(rconf["joint_names"])): while True: try: lim = robot.qlim.copy() # Find the link corresponding to the joint name print("-" * 50) - print(f"Current position limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") - lower_limit = input(f"Enter the new lower limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip() - upper_limit = input(f"Enter the new upper limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip() + print(f"Current position limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + lower_limit = input(f"Enter the new lower limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip() + upper_limit = input(f"Enter the new upper limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip() if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit): print("Invalid input. Lower limit must be less than upper limit.") @@ -278,7 +282,7 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: else: lim[1][i] = float(upper_limit) robot.qlim = lim - print(f"New limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") + print(f"New limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad") print("-" * 50) break except ValueError: @@ -297,10 +301,10 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: while True: print('+-' * 50) update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower() - n_joints = len(config["joint_names"]) + n_joints = len(rconf["joint_names"]) if update_limits == 'y': - config["joint_lim"] = [[]*n_joints,[]*n_joints] - for i, joint in enumerate(config["joint_names"]): + rconf["joint_lim"] = [[]*n_joints,[]*n_joints] + for i, joint in enumerate(rconf["joint_names"]): while True: try: print("-" * 50) @@ -311,39 +315,113 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]: print('--' * 50) print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ") continue - config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None - config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None + rconf["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None + rconf["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None break except ValueError: print("Invalid input. Please enter numeric values or leave blank to keep current limits.") - print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}') + print(f'New limits for joint:\n lower: {rconf["joint_lim"][0]}\n upper: {rconf["joint_lim"][1]}') break elif update_limits == 'n': - config["joint_lim"] = None + rconf["joint_lim"] = None break print("Invalid input. Please enter 'y' or 'n'.") while True: try: print('+-' * 50) - config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() - if config["trajectory_topic_name"] == "": - config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' + rconf["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip() + if rconf["trajectory_topic_name"] == "": + rconf["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory' break - elif config["trajectory_topic_name"].startswith("/"): + elif rconf["trajectory_topic_name"].startswith("/"): break else: print("Invalid topic name. A valid topic name should start with '/'.") except Exception as e: print(f"An error occurred: {e}") + print('--' * 50) + while True: + try: + print('+-' * 50) + rconf["hz"] = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ") + if rconf["hz"] == "": + rconf["hz"] = 100 + else: + rconf["hz"] = float(rconf["hz"]) + break + except ValueError: + print("Invalid input. Please enter a valid number.") + continue + while True: + try: + print('+-' * 50) + nconf["log_ip"] = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): ")) + if nconf["log_ip"] == "": + nconf["log_ip"] = "127.0.0.1" + print('--' * 50) + nconf["log_port"] = input("Enter the target port for the log messages (or press Enter for default: 5005): ") + if nconf["log_port"] == "": + nconf["log_port"] = 5005 + else: + nconf["log_port"] = int(nconf["log_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid IP address.") + continue + while True: + try: + print('+-' * 50) + nconf["state_ip"] = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): ")) + if nconf["state_ip"] == "": + nconf["state_ip"] = "127.0.0.1" + print('--' * 50) + nconf["state_port"] = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ") + if nconf["state_port"] == "": + nconf["state_port"] = 7000 + else: + nconf["state_port"] = int(nconf["state_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid IP address.") + continue + while True: + try: + print('+-' * 50) + nconf["commands_port"] = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ") + if nconf["commands_port"] == nconf["state_port"]: + print("The commands port must be different from the state port.") + continue + if nconf["commands_port"] == "": + nconf["commands_port"] = 8000 + else: + nconf["commands_port"] = int(nconf["commands_port"]) + break + except ValueError: + print("Invalid input. Please enter a valid port.") + continue + print() + print('=-=' * 50) + print() + print(f"Sending joint state's' to {nconf['state_ip']}:{nconf['state_port']}") + print() + print('=-=' * 50) + print() + print(f"Sending log messages to {nconf['log_ip']}:{nconf['log_port']}") + print() + print('=-=' * 50) + print() + print(f"Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{nconf['commands_port']}") + print() + print('=-=' * 50) + print() + config["robot"] = rconf + config["network"] = nconf while True: save_config = input("Do you want to save this interface? (y/n): ").strip().lower() if save_config == "y": config_path = input("Where do you want to save the config?: ").strip() - if not os.path.isdir(config_path): - print("Warning: Path does not exist") - print("Making Directory...") - os.mkdir(config_path) - print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml") + with open(f"{config_path}.toml", "w") as f: + toml.dump(config, f) break elif save_config == "n": print("Will not be saving config") diff --git a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py index 5d64fdf..0c378cb 100644 --- a/workspace/src/osc_ros2/osc_ros2/osc_ros2.py +++ b/workspace/src/osc_ros2/osc_ros2/osc_ros2.py @@ -1,5 +1,7 @@ #ruff: noqa: F403, F405 import rclpy +import argparse +import toml from rclpy.node import Node from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from sensor_msgs.msg import JointState @@ -10,6 +12,7 @@ import numpy as np import spatialmath as sm import roboticstoolbox as rtb from osc4py3 import oscbuildparse +from osc_ros2.interactive_input import interactive_input import time import re import socket @@ -62,62 +65,9 @@ class OSC_ROS2_interface(Node): self.trajectory_topic_name, 1 ) - - while True: - try: - print('+-' * 50) - log_ip = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): ")) - if log_ip == "": - log_ip = "127.0.0.1" - print('--' * 50) - log_port = input("Enter the target port for the log messages (or press Enter for default: 5005): ") - if log_port == "": - log_port = 5005 - else: - log_port = int(log_port) - break - except ValueError: - print("Invalid input. Please enter a valid IP address.") - continue - while True: - try: - print('+-' * 50) - state_ip = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): ")) - if state_ip == "": - state_ip = "127.0.0.1" - print('--' * 50) - state_port = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ") - if state_port == "": - state_port = 7000 - else: - state_port = int(state_port) - break - except ValueError: - print("Invalid input. Please enter a valid IP address.") - continue - while True: - try: - print('+-' * 50) - commands_port = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ") - if commands_port == state_port: - print("The commands port must be different from the state port.") - continue - if commands_port == "": - commands_port = 8000 - else: - commands_port = int(commands_port) - break - except ValueError: - print("Invalid input. Please enter a valid port.") - continue - - osc_startup() - osc_udp_client(state_ip, state_port, "osc_client") - osc_udp_client(log_ip, log_port, "osc_log_client") - osc_udp_server('0.0.0.0', commands_port, "osc_server") # Register OSC handler @@ -127,34 +77,6 @@ class OSC_ROS2_interface(Node): osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK) osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK) osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK) - print('--' * 50) - while True: - try: - print('+-' * 50) - self.hz = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ") - if self.hz == "": - self.hz = 100 - else: - self.hz = float(self.hz) - break - except ValueError: - print("Invalid input. Please enter a valid number.") - continue - print() - print('=-=' * 50) - print() - print(f'Sending joint states to {state_ip}:{state_port}') - print() - print('=-=' * 50) - print() - print(f'Sending log messages to {log_ip}:{log_port}') - print() - print('=-=' * 50) - print() - print(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}') - print() - print('=-=' * 50) - print() self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}") self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}') @@ -764,9 +686,15 @@ class OSC_ROS2_interface(Node): def main(): """Main function to get joint names and start the ROS 2 & OSC system.""" + parser = argparse.ArgumentParser(description="An interface between ROS2 and OSC for robotic arm manipulators", usage="ros2 run osc_ros2 interface [-c CONFIG.TOML]") + parser.add_argument("-c", "--config", type=argparse.FileType("r"), help="A robot TOML config, if the config is provided then the interface will not show the iteractive input") + args = parser.parse_args() rclpy.init() - # TODO: if no config use interactive - config, robot = interactive_input() + if not args.config: + config, robot = interactive_input() + else: + config = toml.load(args.config) + robot = rtb.ERobot.URDF(config["robot"]["urdf"]) node = OSC_ROS2_interface(robot, config) # Run ROS 2 spin, and osc_process will be handled by the timer diff --git a/workspace/ur10e.toml b/workspace/ur10e.toml new file mode 100644 index 0000000..e345f8a --- /dev/null +++ b/workspace/ur10e.toml @@ -0,0 +1,27 @@ +[robot] +urdf = "/workspace/ur10e.urdf" +joint_names = [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",] +cost_mask_string = "111111" +x_limits = [ "None", "None",] +y_limits = [ "None", "None",] +z_limits = [ "None", "None",] +x_limits_workspace = [ "None", "None",] +y_limits_workspace = [ "None", "None",] +z_limits_workspace = [ "None", "None",] +trajectory_topic_name = "/scaled_joint_trajectory_controller/joint_trajectory" +hz = 100 + +[network] +log_ip = "127.0.0.1" +log_port = 5005 +state_ip = "127.0.0.1" +state_port = 7000 +commands_port = 8000 + +[robot.joint_velocity_limits] +shoulder_pan_joint = 2.0943951023931953 +shoulder_lift_joint = 2.0943951023931953 +elbow_joint = 3.141592653589793 +wrist_1_joint = 3.141592653589793 +wrist_2_joint = 3.141592653589793 +wrist_3_joint = 3.141592653589793