[AN] (feat) Successfully writes and reads config
This commit is contained in:
parent
b45371d39b
commit
6db7143bd5
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,11 +1,13 @@
|
||||
import os
|
||||
import time
|
||||
import toml
|
||||
import socket
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Any
|
||||
|
||||
import rclpy
|
||||
import roboticstoolbox as rtb
|
||||
from rclpy import Node
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import JointState
|
||||
|
||||
|
||||
@ -24,27 +26,29 @@ class JointNameListener(Node):
|
||||
def joint_state_callback(self, msg: JointState):
|
||||
self.joint_names = list(msg.name)
|
||||
|
||||
def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
def interactive_input() -> tuple[dict[str, Any], rtb.robot]:
|
||||
config = {}
|
||||
rconf = {}
|
||||
nconf = {}
|
||||
robot = None
|
||||
while True:
|
||||
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
|
||||
if use_urdf == 'y':
|
||||
while True:
|
||||
config["urdf"] = input("Enter the absolute path to the URDF file: ")
|
||||
if os.path.isfile(config["urdf"]):
|
||||
if not config["urdf"].endswith('.urdf'):
|
||||
rconf["urdf"] = input("Enter the absolute path to the URDF file: ")
|
||||
if os.path.isfile(rconf["urdf"]):
|
||||
if not rconf["urdf"].endswith('.urdf'):
|
||||
print("The file is not a URDF file. Please enter a valid URDF file.")
|
||||
continue
|
||||
break
|
||||
else:
|
||||
print("Invalid path. Please enter a valid path to the URDF file.")
|
||||
tree = ET.parse(config["robot_urdf"])
|
||||
tree = ET.parse(rconf["urdf"])
|
||||
root = tree.getroot()
|
||||
robot = rtb.ERobot.URDF(config["urdf"])
|
||||
config["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||
robot = rtb.ERobot.URDF(rconf["urdf"])
|
||||
rconf["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||
print(robot)
|
||||
config["joint_velocity_limits"] = {}
|
||||
rconf["joint_velocity_limits"] = {}
|
||||
# Iterate over all joints in the URDF
|
||||
for joint in root.findall('.//joint'):
|
||||
joint_name = joint.get('name') # Get the name of the joint
|
||||
@ -57,15 +61,15 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
velocity_limit = limit.get('velocity')
|
||||
|
||||
if velocity_limit is not None:
|
||||
config["joint_velocity_limits"][joint_name] = float(velocity_limit)
|
||||
rconf["joint_velocity_limits"][joint_name] = float(velocity_limit)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('-+'*50)
|
||||
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
|
||||
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
|
||||
config["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
|
||||
cost_mask = [int(i) for i in config["cost_mask_string"]]
|
||||
rconf["cost_mask_string"] = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
|
||||
cost_mask = [int(i) for i in rconf["cost_mask_string"]]
|
||||
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
|
||||
break
|
||||
else:
|
||||
@ -81,26 +85,26 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
counter = 0
|
||||
while not(node.joint_names):
|
||||
if counter > 100:
|
||||
config["joint_names"] = None
|
||||
rconf["joint_names"] = None
|
||||
break
|
||||
counter+=1
|
||||
time.sleep(0.1)
|
||||
rclpy.spin_once(node)
|
||||
config["joint_names"] = node.joint_names
|
||||
rconf["joint_names"] = node.joint_names
|
||||
node.destroy_node()
|
||||
if config["joint_names"]:
|
||||
correct_names = input("The following joint names were found:\n" + "\n".join(config["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
|
||||
if rconf["joint_names"]:
|
||||
correct_names = input("The following joint names were found:\n" + "\n".join(rconf["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
|
||||
while True:
|
||||
if correct_names == 'y':
|
||||
break
|
||||
elif correct_names == 'n':
|
||||
config["joint_names"] = None
|
||||
rconf["joint_names"] = None
|
||||
break
|
||||
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
|
||||
if not(config["joint_names"]):
|
||||
if not(rconf["joint_names"]):
|
||||
print("Please enter the joint names manually.")
|
||||
while True:
|
||||
config["joint_names"] = []
|
||||
rconf["joint_names"] = []
|
||||
print('-+'*50)
|
||||
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
|
||||
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
|
||||
@ -110,17 +114,17 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
if joint_name.lower() == 'done':
|
||||
break
|
||||
if joint_name:
|
||||
config["joint_names"].append(joint_name)
|
||||
rconf["joint_names"].append(joint_name)
|
||||
print('-+'*50)
|
||||
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {config['joint_names']}. (y/n)?: ").strip()
|
||||
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {rconf['joint_names']}. (y/n)?: ").strip()
|
||||
if correct_names.lower() == 'y':
|
||||
break
|
||||
else:
|
||||
print("Please re-enter the joint names.")
|
||||
while True:
|
||||
try:
|
||||
config["joint_velocity_limits"] = {}
|
||||
for name in config["joint_names"]:
|
||||
rconf["joint_velocity_limits"] = {}
|
||||
for name in rconf["joint_names"]:
|
||||
while True:
|
||||
try:
|
||||
print('--'*50)
|
||||
@ -128,7 +132,7 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
if limit == "":
|
||||
continue
|
||||
else:
|
||||
config["joint_velocity_limits"][name] = float(limit)
|
||||
rconf["joint_velocity_limits"][name] = float(limit)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a numeric value or press Enter to skip.")
|
||||
@ -146,24 +150,24 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
config["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
config["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
config["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(config["x_limits"]) != 2 or len(config["y_limits"]) != 2 or len(config["z_limits"]) != 2:
|
||||
if len(rconf["x_limits"]) != 2 or len(rconf["y_limits"]) != 2 or len(rconf["z_limits"]) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (config["x_limits"][0] is not None and config["x_limits"][1] is not None and config["x_limits"][0] >= config["x_limits"][1]) or \
|
||||
(config["y_limits"][0] is not None and config["y_limits"][1] is not None and config["y_limits"][0] >= config["y_limits"][1]) or \
|
||||
(config["z_limits"][0] is not None and config["z_limits"][1] is not None and config["z_limits"][0] >= config["z_limits"][1]):
|
||||
if (rconf["x_limits"][0] is not None and rconf["x_limits"][1] is not None and rconf["x_limits"][0] >= rconf["x_limits"][1]) or \
|
||||
(rconf["y_limits"][0] is not None and rconf["y_limits"][1] is not None and rconf["y_limits"][0] >= rconf["y_limits"][1]) or \
|
||||
(rconf["z_limits"][0] is not None and rconf["z_limits"][1] is not None and rconf["z_limits"][0] >= rconf["z_limits"][1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print("Current limits:")
|
||||
print(f"x: {config['x_limits']}")
|
||||
print(f"y: {config['y_limits']}")
|
||||
print(f"z: {config['z_limits']}")
|
||||
print(f"x: {rconf['x_limits']}")
|
||||
print(f"y: {rconf['y_limits']}")
|
||||
print(f"z: {rconf['z_limits']}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
|
||||
@ -180,9 +184,9 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
config["x_limits"] = [None, None]
|
||||
config["y_limits"] = [None, None]
|
||||
config["z_limits"] = [None, None]
|
||||
rconf["x_limits"] = [None, None]
|
||||
rconf["y_limits"] = [None, None]
|
||||
rconf["z_limits"] = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
@ -192,24 +196,24 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
config["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
config["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
config["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(config["x_limits_workspace"]) != 2 or len(config["y_limits_workspace"]) != 2 or len(config["z_limits_workspace"]) != 2:
|
||||
if len(rconf["x_limits_workspace"]) != 2 or len(rconf["y_limits_workspace"]) != 2 or len(rconf["z_limits_workspace"]) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (config["x_limits_workspace"][0] is not None and config["x_limits_workspace"][1] is not None and config["x_limits_workspace"][0] >= config["x_limits_workspace"][1]) or \
|
||||
(config["y_limits_workspace"][0] is not None and config["y_limits_workspace"][1] is not None and config["y_limits_workspace"][0] >= config["y_limits_workspace"][1]) or \
|
||||
(config["z_limits_workspace"][0] is not None and config["z_limits_workspace"][1] is not None and config["z_limits_workspace"][0] >= config["z_limits_workspace"][1]):
|
||||
if (rconf["x_limits_workspace"][0] is not None and rconf["x_limits_workspace"][1] is not None and rconf["x_limits_workspace"][0] >= rconf["x_limits_workspace"][1]) or \
|
||||
(rconf["y_limits_workspace"][0] is not None and rconf["y_limits_workspace"][1] is not None and rconf["y_limits_workspace"][0] >= rconf["y_limits_workspace"][1]) or \
|
||||
(rconf["z_limits_workspace"][0] is not None and rconf["z_limits_workspace"][1] is not None and rconf["z_limits_workspace"][0] >= rconf["z_limits_workspace"][1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print("Current limits:")
|
||||
print(f"x: {config['x_limits_workspace']}")
|
||||
print(f"y: {config['y_limits_workspace']}")
|
||||
print(f"z: {config['z_limits_workspace']}")
|
||||
print(f"x: {rconf['x_limits_workspace']}")
|
||||
print(f"y: {rconf['y_limits_workspace']}")
|
||||
print(f"z: {rconf['z_limits_workspace']}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
||||
@ -226,9 +230,9 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
config["x_limits_workspace"] = [None, None]
|
||||
config["y_limits_workspace"] = [None, None]
|
||||
config["z_limits_workspace"] = [None, None]
|
||||
rconf["x_limits_workspace"] = [None, None]
|
||||
rconf["y_limits_workspace"] = [None, None]
|
||||
rconf["z_limits_workspace"] = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
# Ask the user if they want to set new joint limits
|
||||
@ -236,15 +240,15 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
print('+-'*50)
|
||||
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
||||
if update_limits == 'y':
|
||||
for i in range(len(config["joint_names"])):
|
||||
for i in range(len(rconf["joint_names"])):
|
||||
while True:
|
||||
try:
|
||||
lim = robot.qlim.copy()
|
||||
# Find the link corresponding to the joint name
|
||||
print("-" * 50)
|
||||
print(f"Current position limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{config['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
print(f"Current position limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print("Invalid input. Lower limit must be less than upper limit.")
|
||||
@ -278,7 +282,7 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
else:
|
||||
lim[1][i] = float(upper_limit)
|
||||
robot.qlim = lim
|
||||
print(f"New limits for joint '{config['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
print(f"New limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
print("-" * 50)
|
||||
break
|
||||
except ValueError:
|
||||
@ -297,10 +301,10 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
|
||||
n_joints = len(config["joint_names"])
|
||||
n_joints = len(rconf["joint_names"])
|
||||
if update_limits == 'y':
|
||||
config["joint_lim"] = [[]*n_joints,[]*n_joints]
|
||||
for i, joint in enumerate(config["joint_names"]):
|
||||
rconf["joint_lim"] = [[]*n_joints,[]*n_joints]
|
||||
for i, joint in enumerate(rconf["joint_names"]):
|
||||
while True:
|
||||
try:
|
||||
print("-" * 50)
|
||||
@ -311,39 +315,113 @@ def interactive_input() -> tuple[dict[str, Any], rtb.robot | None]:
|
||||
print('--' * 50)
|
||||
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
|
||||
continue
|
||||
config["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
|
||||
config["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
|
||||
rconf["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
|
||||
rconf["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
print(f'New limits for joint:\n lower: {config["joint_lim"][0]}\n upper: {config["joint_lim"][1]}')
|
||||
print(f'New limits for joint:\n lower: {rconf["joint_lim"][0]}\n upper: {rconf["joint_lim"][1]}')
|
||||
break
|
||||
elif update_limits == 'n':
|
||||
config["joint_lim"] = None
|
||||
rconf["joint_lim"] = None
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
config["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
||||
if config["trajectory_topic_name"] == "":
|
||||
config["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
|
||||
rconf["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
||||
if rconf["trajectory_topic_name"] == "":
|
||||
rconf["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
|
||||
break
|
||||
elif config["trajectory_topic_name"].startswith("/"):
|
||||
elif rconf["trajectory_topic_name"].startswith("/"):
|
||||
break
|
||||
else:
|
||||
print("Invalid topic name. A valid topic name should start with '/'.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
print('--' * 50)
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
rconf["hz"] = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
||||
if rconf["hz"] == "":
|
||||
rconf["hz"] = 100
|
||||
else:
|
||||
rconf["hz"] = float(rconf["hz"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid number.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["log_ip"] = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if nconf["log_ip"] == "":
|
||||
nconf["log_ip"] = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
nconf["log_port"] = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
||||
if nconf["log_port"] == "":
|
||||
nconf["log_port"] = 5005
|
||||
else:
|
||||
nconf["log_port"] = int(nconf["log_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["state_ip"] = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if nconf["state_ip"] == "":
|
||||
nconf["state_ip"] = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
nconf["state_port"] = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
||||
if nconf["state_port"] == "":
|
||||
nconf["state_port"] = 7000
|
||||
else:
|
||||
nconf["state_port"] = int(nconf["state_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["commands_port"] = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
||||
if nconf["commands_port"] == nconf["state_port"]:
|
||||
print("The commands port must be different from the state port.")
|
||||
continue
|
||||
if nconf["commands_port"] == "":
|
||||
nconf["commands_port"] = 8000
|
||||
else:
|
||||
nconf["commands_port"] = int(nconf["commands_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid port.")
|
||||
continue
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Sending joint state's' to {nconf['state_ip']}:{nconf['state_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Sending log messages to {nconf['log_ip']}:{nconf['log_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{nconf['commands_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
config["robot"] = rconf
|
||||
config["network"] = nconf
|
||||
while True:
|
||||
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
|
||||
if save_config == "y":
|
||||
config_path = input("Where do you want to save the config?: ").strip()
|
||||
if not os.path.isdir(config_path):
|
||||
print("Warning: Path does not exist")
|
||||
print("Making Directory...")
|
||||
os.mkdir(config_path)
|
||||
print(f"[TODO] osc_ros2.py:1091:0: Write config to {config_path}/config.toml")
|
||||
with open(f"{config_path}.toml", "w") as f:
|
||||
toml.dump(config, f)
|
||||
break
|
||||
elif save_config == "n":
|
||||
print("Will not be saving config")
|
||||
|
@ -1,5 +1,7 @@
|
||||
#ruff: noqa: F403, F405
|
||||
import rclpy
|
||||
import argparse
|
||||
import toml
|
||||
from rclpy.node import Node
|
||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||
from sensor_msgs.msg import JointState
|
||||
@ -10,6 +12,7 @@ import numpy as np
|
||||
import spatialmath as sm
|
||||
import roboticstoolbox as rtb
|
||||
from osc4py3 import oscbuildparse
|
||||
from osc_ros2.interactive_input import interactive_input
|
||||
import time
|
||||
import re
|
||||
import socket
|
||||
@ -62,62 +65,9 @@ class OSC_ROS2_interface(Node):
|
||||
self.trajectory_topic_name,
|
||||
1
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
log_ip = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if log_ip == "":
|
||||
log_ip = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
log_port = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
||||
if log_port == "":
|
||||
log_port = 5005
|
||||
else:
|
||||
log_port = int(log_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
state_ip = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if state_ip == "":
|
||||
state_ip = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
state_port = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
||||
if state_port == "":
|
||||
state_port = 7000
|
||||
else:
|
||||
state_port = int(state_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
commands_port = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
||||
if commands_port == state_port:
|
||||
print("The commands port must be different from the state port.")
|
||||
continue
|
||||
if commands_port == "":
|
||||
commands_port = 8000
|
||||
else:
|
||||
commands_port = int(commands_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid port.")
|
||||
continue
|
||||
|
||||
|
||||
osc_startup()
|
||||
|
||||
osc_udp_client(state_ip, state_port, "osc_client")
|
||||
|
||||
osc_udp_client(log_ip, log_port, "osc_log_client")
|
||||
|
||||
osc_udp_server('0.0.0.0', commands_port, "osc_server")
|
||||
|
||||
# Register OSC handler
|
||||
@ -127,34 +77,6 @@ class OSC_ROS2_interface(Node):
|
||||
osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
print('--' * 50)
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
self.hz = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
||||
if self.hz == "":
|
||||
self.hz = 100
|
||||
else:
|
||||
self.hz = float(self.hz)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid number.")
|
||||
continue
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Sending joint states to {state_ip}:{state_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Sending log messages to {log_ip}:{log_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
|
||||
self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}")
|
||||
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
||||
@ -764,9 +686,15 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
def main():
|
||||
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
||||
parser = argparse.ArgumentParser(description="An interface between ROS2 and OSC for robotic arm manipulators", usage="ros2 run osc_ros2 interface [-c CONFIG.TOML]")
|
||||
parser.add_argument("-c", "--config", type=argparse.FileType("r"), help="A robot TOML config, if the config is provided then the interface will not show the iteractive input")
|
||||
args = parser.parse_args()
|
||||
rclpy.init()
|
||||
# TODO: if no config use interactive
|
||||
config, robot = interactive_input()
|
||||
if not args.config:
|
||||
config, robot = interactive_input()
|
||||
else:
|
||||
config = toml.load(args.config)
|
||||
robot = rtb.ERobot.URDF(config["robot"]["urdf"])
|
||||
node = OSC_ROS2_interface(robot, config)
|
||||
|
||||
# Run ROS 2 spin, and osc_process will be handled by the timer
|
||||
|
27
workspace/ur10e.toml
Normal file
27
workspace/ur10e.toml
Normal file
@ -0,0 +1,27 @@
|
||||
[robot]
|
||||
urdf = "/workspace/ur10e.urdf"
|
||||
joint_names = [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",]
|
||||
cost_mask_string = "111111"
|
||||
x_limits = [ "None", "None",]
|
||||
y_limits = [ "None", "None",]
|
||||
z_limits = [ "None", "None",]
|
||||
x_limits_workspace = [ "None", "None",]
|
||||
y_limits_workspace = [ "None", "None",]
|
||||
z_limits_workspace = [ "None", "None",]
|
||||
trajectory_topic_name = "/scaled_joint_trajectory_controller/joint_trajectory"
|
||||
hz = 100
|
||||
|
||||
[network]
|
||||
log_ip = "127.0.0.1"
|
||||
log_port = 5005
|
||||
state_ip = "127.0.0.1"
|
||||
state_port = 7000
|
||||
commands_port = 8000
|
||||
|
||||
[robot.joint_velocity_limits]
|
||||
shoulder_pan_joint = 2.0943951023931953
|
||||
shoulder_lift_joint = 2.0943951023931953
|
||||
elbow_joint = 3.141592653589793
|
||||
wrist_1_joint = 3.141592653589793
|
||||
wrist_2_joint = 3.141592653589793
|
||||
wrist_3_joint = 3.141592653589793
|
Loading…
Reference in New Issue
Block a user