Compare commits
9 Commits
b9434a5e7f
...
70f78ada2b
Author | SHA1 | Date | |
---|---|---|---|
70f78ada2b | |||
cdec866f4f | |||
ce3f00c3e2 | |||
6942ff044e | |||
c665883185 | |||
![]() |
4ab2ff3c3b | ||
![]() |
6db7143bd5 | ||
![]() |
b45371d39b | ||
![]() |
b4abe1eec1 |
19
examples/log.py
Normal file
19
examples/log.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import socket
|
||||||
|
from ssl import SOCK_STREAM
|
||||||
|
from oscpy import *
|
||||||
|
|
||||||
|
UDP_IP = "172.18.0.1"
|
||||||
|
UDP_PORT = 5005
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.bind((UDP_IP, UDP_PORT))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
data, addr = sock.recvfrom(10*1024)
|
||||||
|
if "#bundle" in data[:8].decode():
|
||||||
|
b = Bundle(data)
|
||||||
|
print(b)
|
||||||
|
else:
|
||||||
|
m = Message(data)
|
||||||
|
print(m)
|
||||||
|
|
45
examples/move.py
Normal file
45
examples/move.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import socket
|
||||||
|
import time
|
||||||
|
from oscpy import *
|
||||||
|
|
||||||
|
|
||||||
|
SEND_IP = "172.18.0.2"
|
||||||
|
SEND_PORT = 8000
|
||||||
|
RECV_IP = "172.18.0.1"
|
||||||
|
RECV_PORT = 7000
|
||||||
|
|
||||||
|
pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
msg = Message("/joint_positions", [-1]*6).encode()
|
||||||
|
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
|
||||||
|
if n > 0:
|
||||||
|
print(f"INFO: Sent {n} bytes")
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
msg = Message("/tcp_coordinates", [-1]*6).encode()
|
||||||
|
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
|
||||||
|
if n > 0:
|
||||||
|
print(f"INFO: Sent {n} bytes")
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
sub.bind((RECV_IP, RECV_PORT))
|
||||||
|
joint_names = []
|
||||||
|
found = False
|
||||||
|
while not found:
|
||||||
|
data, addr = sub.recvfrom(5*1024)
|
||||||
|
if "#bundle" in data[:8].decode():
|
||||||
|
b = Bundle().decode(data)
|
||||||
|
for msg in b.msgs:
|
||||||
|
if "/joint_state/name" in msg.address:
|
||||||
|
joint_names = msg.args
|
||||||
|
found = True
|
||||||
|
|
||||||
|
if joint_names:
|
||||||
|
msg = Message(f"/joint_position/{joint_names[2]}", [ 0 ])
|
||||||
|
n = pub.sendto(msg.encode(), (SEND_IP, SEND_PORT))
|
||||||
|
if n > 0:
|
||||||
|
print(f"INFO: Sent {n} bytes")
|
||||||
|
|
138
examples/oscpy/__init__.py
Normal file
138
examples/oscpy/__init__.py
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
import struct
|
||||||
|
from typing import final, override
|
||||||
|
|
||||||
|
OSC_TYPES = str | int | float | bool | bytes
|
||||||
|
|
||||||
|
def get_string_size(data: bytes) -> int:
|
||||||
|
n = data.find(b'\x00')
|
||||||
|
n = n + 4 - n%4
|
||||||
|
return n
|
||||||
|
|
||||||
|
def to_osc_string(s: str) -> bytes:
|
||||||
|
n = len(s)
|
||||||
|
b = struct.pack(f'{n}s{4 - n%4}x', s.encode())
|
||||||
|
return b
|
||||||
|
|
||||||
|
def to_osc_blob(b: bytes) -> bytes:
|
||||||
|
n = len(b)
|
||||||
|
b = struct.pack(f'i{4 - n%4}p', n, b)
|
||||||
|
return b
|
||||||
|
|
||||||
|
def parse_string(data: bytes) -> tuple[bytes, str]:
|
||||||
|
n = get_string_size(data)
|
||||||
|
values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n])
|
||||||
|
value = values[0].split(b"\x00", 1)
|
||||||
|
return data[n:], value[0].decode()
|
||||||
|
|
||||||
|
def parse_float(data: bytes) -> tuple[bytes, float]:
|
||||||
|
values: tuple[float] = struct.unpack('>f', data[:4])
|
||||||
|
return data[4:], values[0]
|
||||||
|
|
||||||
|
def parse_int(data: bytes) -> tuple[bytes, int]:
|
||||||
|
values: tuple[int] = struct.unpack('>i', data[:4])
|
||||||
|
return data[4:], values[0]
|
||||||
|
|
||||||
|
def parse_blob(data: bytes) -> tuple[bytes, bytes]:
|
||||||
|
n_values: tuple[int] = struct.unpack('>i', data[:4])
|
||||||
|
n: int = n_values[0]
|
||||||
|
values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n])
|
||||||
|
return data[4+n:], values[0]
|
||||||
|
|
||||||
|
def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]:
|
||||||
|
tt = tt[1:]
|
||||||
|
args: list[OSC_TYPES] = []
|
||||||
|
for c in tt:
|
||||||
|
match c:
|
||||||
|
case 's':
|
||||||
|
data, val = parse_string(data)
|
||||||
|
args.append(val)
|
||||||
|
case 'b':
|
||||||
|
data, val = parse_blob(data)
|
||||||
|
args.append(val)
|
||||||
|
case 'f':
|
||||||
|
data, val = parse_float(data)
|
||||||
|
args.append(val)
|
||||||
|
case 'i':
|
||||||
|
data, val = parse_int(data)
|
||||||
|
args.append(val)
|
||||||
|
case _:
|
||||||
|
print(f"[ERROR]: Got {c}")
|
||||||
|
return args
|
||||||
|
|
||||||
|
def encode_args(args: list[OSC_TYPES]) -> bytes:
|
||||||
|
encoded = b''
|
||||||
|
for arg in args:
|
||||||
|
match arg:
|
||||||
|
case str(): encoded += to_osc_string(arg)
|
||||||
|
case float(): encoded += struct.pack('>f', arg)
|
||||||
|
case int(): encoded += struct.pack('>i', arg)
|
||||||
|
case bytes(): encoded += to_osc_blob(arg)
|
||||||
|
return encoded
|
||||||
|
|
||||||
|
def parse_type(arg: OSC_TYPES) -> str:
|
||||||
|
match arg:
|
||||||
|
case str(): return "s"
|
||||||
|
case float(): return "f"
|
||||||
|
case int(): return "i"
|
||||||
|
case bytes(): return "b"
|
||||||
|
|
||||||
|
@final
|
||||||
|
class Message:
|
||||||
|
def __init__(self, address: str = "/", args: list[OSC_TYPES] = []):
|
||||||
|
self.address: str = address
|
||||||
|
self.type_tags = ","
|
||||||
|
for arg in args:
|
||||||
|
self.type_tags = self.type_tags + parse_type(arg)
|
||||||
|
self.args: list[OSC_TYPES] = args
|
||||||
|
|
||||||
|
def decode(self, data: bytes):
|
||||||
|
data, self.address = parse_string(data)
|
||||||
|
data, self.type_tags = parse_string(data)
|
||||||
|
self.args = parse_args(self.type_tags, data)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def encode(self) -> bytes:
|
||||||
|
msg = to_osc_string(self.address)
|
||||||
|
msg += to_osc_string(self.type_tags)
|
||||||
|
msg += encode_args(self.args)
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@override
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f"{self.address} [{self.type_tags}]: {self.args}"
|
||||||
|
|
||||||
|
@final
|
||||||
|
class Bundle:
|
||||||
|
def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []):
|
||||||
|
self.header = "#bundle"
|
||||||
|
self.ttag = time_tag
|
||||||
|
self.msgs = msgs
|
||||||
|
|
||||||
|
def decode(self, data: bytes):
|
||||||
|
data, self.header = parse_string(data)
|
||||||
|
data[4:]
|
||||||
|
data, self.ttag = parse_float(data)
|
||||||
|
data = data[4:]
|
||||||
|
self.msgs: list[Message] = []
|
||||||
|
while len(data) > 0:
|
||||||
|
data, n = parse_int(data)
|
||||||
|
msg_data = data[:n]
|
||||||
|
data = data[n:]
|
||||||
|
self.msgs.append(Message().decode(msg_data))
|
||||||
|
return self
|
||||||
|
|
||||||
|
def encode(self) -> bytes:
|
||||||
|
bundle = to_osc_string("#bundle")
|
||||||
|
bundle += struct.pack('4xf4x', self.ttag)
|
||||||
|
for msg in self.msgs:
|
||||||
|
msg_data = msg.encode()
|
||||||
|
bundle += struct.pack('i', len(msg_data))
|
||||||
|
bundle += msg_data
|
||||||
|
return bundle
|
||||||
|
|
||||||
|
@override
|
||||||
|
def __str__(self) -> str:
|
||||||
|
out = f"{self.header} ({self.ttag}):\n"
|
||||||
|
for msg in self.msgs:
|
||||||
|
out += f" - {msg}\n"
|
||||||
|
return out
|
27
examples/state.py
Normal file
27
examples/state.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
from oscpy import *
|
||||||
|
|
||||||
|
UDP_IP = "172.18.0.1"
|
||||||
|
UDP_PORT = 7000
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.bind((UDP_IP, UDP_PORT))
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
data, addr = sock.recvfrom(10*1024)
|
||||||
|
n = get_string_size(data)
|
||||||
|
[tag] = struct.unpack(f'{n}s', data[:n])
|
||||||
|
if "#bundle" in tag.decode():
|
||||||
|
b = Bundle(data)
|
||||||
|
print("Bundle:")
|
||||||
|
for msg in b.msgs:
|
||||||
|
print(" -", msg)
|
||||||
|
else:
|
||||||
|
msg = Message(data)
|
||||||
|
if msg.address == "/time":
|
||||||
|
msg.args[0] = float(msg.args[0])
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
|
7
examples/test.py
Normal file
7
examples/test.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
from oscpy import *
|
||||||
|
|
||||||
|
|
||||||
|
encoded = Message("/", ["test", 0.1, 9]).encode()
|
||||||
|
new_msg = Message().decode(encoded).encode()
|
||||||
|
assert(encoded == new_msg)
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
429
workspace/src/osc_ros2/osc_ros2/interactive_input.py
Normal file
429
workspace/src/osc_ros2/osc_ros2/interactive_input.py
Normal file
@ -0,0 +1,429 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
import toml
|
||||||
|
import socket
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
import roboticstoolbox as rtb
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import JointState
|
||||||
|
|
||||||
|
|
||||||
|
class JointNameListener(Node):
|
||||||
|
joint_names: list[str]|None
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('joint_name_listener')
|
||||||
|
self.subscription = self.create_subscription(
|
||||||
|
JointState,
|
||||||
|
'/joint_states',
|
||||||
|
self.joint_state_callback,
|
||||||
|
1
|
||||||
|
)
|
||||||
|
self.joint_names = None
|
||||||
|
|
||||||
|
def joint_state_callback(self, msg: JointState):
|
||||||
|
self.joint_names = list(msg.name)
|
||||||
|
|
||||||
|
def interactive_input() -> tuple[dict[str, Any], rtb.robot]:
|
||||||
|
config = {}
|
||||||
|
rconf = {}
|
||||||
|
nconf = {}
|
||||||
|
robot = None
|
||||||
|
while True:
|
||||||
|
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
|
||||||
|
if use_urdf == 'y':
|
||||||
|
while True:
|
||||||
|
rconf["urdf"] = input("Enter the absolute path to the URDF file: ")
|
||||||
|
if os.path.isfile(rconf["urdf"]):
|
||||||
|
if not rconf["urdf"].endswith('.urdf'):
|
||||||
|
print("The file is not a URDF file. Please enter a valid URDF file.")
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print("Invalid path. Please enter a valid path to the URDF file.")
|
||||||
|
tree = ET.parse(rconf["urdf"])
|
||||||
|
root = tree.getroot()
|
||||||
|
robot = rtb.ERobot.URDF(rconf["urdf"])
|
||||||
|
rconf["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||||
|
print(robot)
|
||||||
|
rconf["joint_velocity_limits"] = {}
|
||||||
|
# Iterate over all joints in the URDF
|
||||||
|
for joint in root.findall('.//joint'):
|
||||||
|
joint_name = joint.get('name') # Get the name of the joint
|
||||||
|
|
||||||
|
# Look for the <limit> tag under each joint
|
||||||
|
limit = joint.find('limit')
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
# Extract the velocity limit (if it exists)
|
||||||
|
velocity_limit = limit.get('velocity')
|
||||||
|
|
||||||
|
if velocity_limit is not None:
|
||||||
|
rconf["joint_velocity_limits"][joint_name] = float(velocity_limit)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('-+'*50)
|
||||||
|
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
|
||||||
|
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
|
||||||
|
cost_mask_string = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
|
||||||
|
rconf["cost_mask"] = [int(i) for i in cost_mask_string]
|
||||||
|
if sum(rconf["cost_mask"]) <= robot.n and len(rconf["cost_mask"]) == 6:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter integers only.")
|
||||||
|
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if rconf['cost_mask'][i]==1]}")
|
||||||
|
break
|
||||||
|
elif use_urdf == 'n':
|
||||||
|
node = JointNameListener()
|
||||||
|
print("Wainting 10 sec for JointState messages to extract joint names...")
|
||||||
|
rclpy.spin_once(node)
|
||||||
|
counter = 0
|
||||||
|
while not(node.joint_names):
|
||||||
|
if counter > 100:
|
||||||
|
rconf["joint_names"] = None
|
||||||
|
break
|
||||||
|
counter+=1
|
||||||
|
time.sleep(0.1)
|
||||||
|
rclpy.spin_once(node)
|
||||||
|
rconf["joint_names"] = node.joint_names
|
||||||
|
node.destroy_node()
|
||||||
|
if rconf["joint_names"]:
|
||||||
|
correct_names = input("The following joint names were found:\n" + "\n".join(rconf["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
|
||||||
|
while True:
|
||||||
|
if correct_names == 'y':
|
||||||
|
break
|
||||||
|
elif correct_names == 'n':
|
||||||
|
rconf["joint_names"] = None
|
||||||
|
break
|
||||||
|
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
if not(rconf["joint_names"]):
|
||||||
|
print("Please enter the joint names manually.")
|
||||||
|
while True:
|
||||||
|
rconf["joint_names"] = []
|
||||||
|
print('-+'*50)
|
||||||
|
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
|
||||||
|
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
|
||||||
|
while True:
|
||||||
|
print('-'*50)
|
||||||
|
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
|
||||||
|
if joint_name.lower() == 'done':
|
||||||
|
break
|
||||||
|
if joint_name:
|
||||||
|
rconf["joint_names"].append(joint_name)
|
||||||
|
print('-+'*50)
|
||||||
|
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {rconf['joint_names']}. (y/n)?: ").strip()
|
||||||
|
if correct_names.lower() == 'y':
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print("Please re-enter the joint names.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
rconf["joint_velocity_limits"] = {}
|
||||||
|
for name in rconf["joint_names"]:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('--'*50)
|
||||||
|
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
|
||||||
|
if limit == "":
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
rconf["joint_velocity_limits"][name] = float(limit)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter a numeric value or press Enter to skip.")
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter numeric values or leave blank to skip.")
|
||||||
|
robot = None
|
||||||
|
rconf["cost_mask"] = None
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
if robot:
|
||||||
|
while True:
|
||||||
|
print('+-' * 50)
|
||||||
|
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
|
||||||
|
if set_limits == 'y':
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
rconf["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
rconf["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
rconf["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
|
||||||
|
if len(rconf["x_limits"]) != 2 or len(rconf["y_limits"]) != 2 or len(rconf["z_limits"]) != 2:
|
||||||
|
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if (rconf["x_limits"][0] is not None and rconf["x_limits"][1] is not None and rconf["x_limits"][0] >= rconf["x_limits"][1]) or \
|
||||||
|
(rconf["y_limits"][0] is not None and rconf["y_limits"][1] is not None and rconf["y_limits"][0] >= rconf["y_limits"][1]) or \
|
||||||
|
(rconf["z_limits"][0] is not None and rconf["z_limits"][1] is not None and rconf["z_limits"][0] >= rconf["z_limits"][1]):
|
||||||
|
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Current limits:")
|
||||||
|
print(f"x: {rconf['x_limits']}")
|
||||||
|
print(f"y: {rconf['y_limits']}")
|
||||||
|
print(f"z: {rconf['z_limits']}")
|
||||||
|
con = True
|
||||||
|
while con:
|
||||||
|
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
|
||||||
|
if confirm == 'y':
|
||||||
|
break
|
||||||
|
elif confirm == 'n':
|
||||||
|
print("Please re-enter the limits.")
|
||||||
|
con = False
|
||||||
|
else:
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
if con:
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter numeric values only.")
|
||||||
|
break
|
||||||
|
elif set_limits == 'n':
|
||||||
|
rconf["x_limits"] = [None, None]
|
||||||
|
rconf["y_limits"] = [None, None]
|
||||||
|
rconf["z_limits"] = [None, None]
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print('+-' * 50)
|
||||||
|
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
|
||||||
|
if set_limits == 'y':
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
rconf["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
rconf["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
rconf["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||||
|
|
||||||
|
if len(rconf["x_limits_workspace"]) != 2 or len(rconf["y_limits_workspace"]) != 2 or len(rconf["z_limits_workspace"]) != 2:
|
||||||
|
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if (rconf["x_limits_workspace"][0] is not None and rconf["x_limits_workspace"][1] is not None and rconf["x_limits_workspace"][0] >= rconf["x_limits_workspace"][1]) or \
|
||||||
|
(rconf["y_limits_workspace"][0] is not None and rconf["y_limits_workspace"][1] is not None and rconf["y_limits_workspace"][0] >= rconf["y_limits_workspace"][1]) or \
|
||||||
|
(rconf["z_limits_workspace"][0] is not None and rconf["z_limits_workspace"][1] is not None and rconf["z_limits_workspace"][0] >= rconf["z_limits_workspace"][1]):
|
||||||
|
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Current limits:")
|
||||||
|
print(f"x: {rconf['x_limits_workspace']}")
|
||||||
|
print(f"y: {rconf['y_limits_workspace']}")
|
||||||
|
print(f"z: {rconf['z_limits_workspace']}")
|
||||||
|
con = True
|
||||||
|
while con:
|
||||||
|
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
||||||
|
if confirm == 'y':
|
||||||
|
break
|
||||||
|
elif confirm == 'n':
|
||||||
|
print("Please re-enter the limits.")
|
||||||
|
con = False
|
||||||
|
else:
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
if con:
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter numeric values only.")
|
||||||
|
break
|
||||||
|
elif set_limits == 'n':
|
||||||
|
rconf["x_limits_workspace"] = [None, None]
|
||||||
|
rconf["y_limits_workspace"] = [None, None]
|
||||||
|
rconf["z_limits_workspace"] = [None, None]
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
# Ask the user if they want to set new joint limits
|
||||||
|
while True:
|
||||||
|
print('+-'*50)
|
||||||
|
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
||||||
|
if update_limits == 'y':
|
||||||
|
for i in range(len(rconf["joint_names"])):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
lim = robot.qlim.copy()
|
||||||
|
# Find the link corresponding to the joint name
|
||||||
|
print("-" * 50)
|
||||||
|
print(f"Current position limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||||
|
lower_limit = input(f"Enter the new lower limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||||
|
upper_limit = input(f"Enter the new upper limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||||
|
|
||||||
|
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||||
|
print("Invalid input. Lower limit must be less than upper limit.")
|
||||||
|
continue
|
||||||
|
lower_limit = float(lower_limit) if lower_limit!="" else None
|
||||||
|
upper_limit = float(upper_limit) if upper_limit!="" else None
|
||||||
|
if lower_limit is not None:
|
||||||
|
if lower_limit<lim[0][i]:
|
||||||
|
while True:
|
||||||
|
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
|
||||||
|
if sure == 'y':
|
||||||
|
lim[0][i] = float(lower_limit)
|
||||||
|
break
|
||||||
|
elif sure == 'n':
|
||||||
|
print("Lower limit not changed.")
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
else:
|
||||||
|
lim[0][i] = float(lower_limit)
|
||||||
|
if upper_limit is not None:
|
||||||
|
if upper_limit>lim[1][i]:
|
||||||
|
while True:
|
||||||
|
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
|
||||||
|
if sure == 'y':
|
||||||
|
lim[1][i] = float(upper_limit)
|
||||||
|
break
|
||||||
|
elif sure == 'n':
|
||||||
|
print("Upper limit not changed.")
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
else:
|
||||||
|
lim[1][i] = float(upper_limit)
|
||||||
|
robot.qlim = lim
|
||||||
|
print(f"New limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||||
|
print("-" * 50)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||||
|
break
|
||||||
|
if update_limits == 'n':
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
'''
|
||||||
|
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
||||||
|
if use_link_mask == 'y':
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
'''
|
||||||
|
else:
|
||||||
|
while True:
|
||||||
|
print('+-' * 50)
|
||||||
|
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
|
||||||
|
n_joints = len(rconf["joint_names"])
|
||||||
|
if update_limits == 'y':
|
||||||
|
rconf["joint_lim"] = [[]*n_joints,[]*n_joints]
|
||||||
|
for i, joint in enumerate(rconf["joint_names"]):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print("-" * 50)
|
||||||
|
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||||
|
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||||
|
|
||||||
|
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||||
|
print('--' * 50)
|
||||||
|
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
|
||||||
|
continue
|
||||||
|
rconf["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
|
||||||
|
rconf["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||||
|
print(f'New limits for joint:\n lower: {rconf["joint_lim"][0]}\n upper: {rconf["joint_lim"][1]}')
|
||||||
|
break
|
||||||
|
elif update_limits == 'n':
|
||||||
|
rconf["joint_lim"] = None
|
||||||
|
break
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('+-' * 50)
|
||||||
|
rconf["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
||||||
|
if rconf["trajectory_topic_name"] == "":
|
||||||
|
rconf["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
|
||||||
|
break
|
||||||
|
elif rconf["trajectory_topic_name"].startswith("/"):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print("Invalid topic name. A valid topic name should start with '/'.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
|
print('--' * 50)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('+-' * 50)
|
||||||
|
rconf["hz"] = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
||||||
|
if rconf["hz"] == "":
|
||||||
|
rconf["hz"] = 100
|
||||||
|
else:
|
||||||
|
rconf["hz"] = float(rconf["hz"])
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter a valid number.")
|
||||||
|
continue
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('+-' * 50)
|
||||||
|
nconf["log_ip"] = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||||
|
if nconf["log_ip"] == "":
|
||||||
|
nconf["log_ip"] = "127.0.0.1"
|
||||||
|
print('--' * 50)
|
||||||
|
nconf["log_port"] = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
||||||
|
if nconf["log_port"] == "":
|
||||||
|
nconf["log_port"] = 5005
|
||||||
|
else:
|
||||||
|
nconf["log_port"] = int(nconf["log_port"])
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter a valid IP address.")
|
||||||
|
continue
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('+-' * 50)
|
||||||
|
nconf["state_ip"] = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||||
|
if nconf["state_ip"] == "":
|
||||||
|
nconf["state_ip"] = "127.0.0.1"
|
||||||
|
print('--' * 50)
|
||||||
|
nconf["state_port"] = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
||||||
|
if nconf["state_port"] == "":
|
||||||
|
nconf["state_port"] = 7000
|
||||||
|
else:
|
||||||
|
nconf["state_port"] = int(nconf["state_port"])
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter a valid IP address.")
|
||||||
|
continue
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print('+-' * 50)
|
||||||
|
nconf["commands_port"] = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
||||||
|
if nconf["commands_port"] == nconf["state_port"]:
|
||||||
|
print("The commands port must be different from the state port.")
|
||||||
|
continue
|
||||||
|
if nconf["commands_port"] == "":
|
||||||
|
nconf["commands_port"] = 8000
|
||||||
|
else:
|
||||||
|
nconf["commands_port"] = int(nconf["commands_port"])
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid input. Please enter a valid port.")
|
||||||
|
continue
|
||||||
|
print()
|
||||||
|
print('=-=' * 50)
|
||||||
|
print()
|
||||||
|
print(f"Sending joint state's' to {nconf['state_ip']}:{nconf['state_port']}")
|
||||||
|
print()
|
||||||
|
print('=-=' * 50)
|
||||||
|
print()
|
||||||
|
print(f"Sending log messages to {nconf['log_ip']}:{nconf['log_port']}")
|
||||||
|
print()
|
||||||
|
print('=-=' * 50)
|
||||||
|
print()
|
||||||
|
print(f"Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{nconf['commands_port']}")
|
||||||
|
print()
|
||||||
|
print('=-=' * 50)
|
||||||
|
print()
|
||||||
|
config["robot"] = rconf
|
||||||
|
config["network"] = nconf
|
||||||
|
while True:
|
||||||
|
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
|
||||||
|
if save_config == "y":
|
||||||
|
config_path = input("Where do you want to save the config?: ").strip()
|
||||||
|
with open(f"{config_path}.toml", "w") as f:
|
||||||
|
toml.dump(config, f)
|
||||||
|
break
|
||||||
|
elif save_config == "n":
|
||||||
|
print("Will not be saving config")
|
||||||
|
print("Invalid input. Please enter 'y' or 'n'.")
|
||||||
|
return config, robot
|
@ -1,60 +1,59 @@
|
|||||||
|
#ruff: noqa: F403, F405
|
||||||
import rclpy
|
import rclpy
|
||||||
|
import argparse
|
||||||
|
import toml
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||||
from sensor_msgs.msg import JointState
|
from sensor_msgs.msg import JointState
|
||||||
from rcl_interfaces.msg import Log
|
from rcl_interfaces.msg import Log
|
||||||
from osc4py3.as_allthreads import *
|
from osc4py3.as_allthreads import *
|
||||||
from osc4py3 import oscmethod as osm
|
from osc4py3 import oscmethod as osm
|
||||||
import xml.etree.ElementTree as ET
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import spatialmath as sm
|
import spatialmath as sm
|
||||||
import roboticstoolbox as rtb
|
import roboticstoolbox as rtb
|
||||||
from osc4py3 import oscbuildparse
|
from osc4py3 import oscbuildparse
|
||||||
|
from osc_ros2.interactive_input import interactive_input
|
||||||
import time
|
import time
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
class JointNameListener(Node):
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__('joint_name_listener')
|
|
||||||
self.subscription = self.create_subscription(
|
|
||||||
JointState,
|
|
||||||
'/joint_states',
|
|
||||||
self.joint_state_callback,
|
|
||||||
1
|
|
||||||
)
|
|
||||||
self.joint_names = None
|
|
||||||
|
|
||||||
def joint_state_callback(self, msg: JointState):
|
|
||||||
self.joint_names = list(msg.name)
|
|
||||||
|
|
||||||
class OSC_ROS2_interface(Node):
|
class OSC_ROS2_interface(Node):
|
||||||
"""Node to publish joint trajectories based on OSC messages."""
|
"""Node to publish joint trajectories based on OSC messages."""
|
||||||
|
|
||||||
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
|
def __init__(self, robot, config):
|
||||||
super().__init__('scaled_joint_trajectory_publisher')
|
super().__init__('scaled_joint_trajectory_publisher')
|
||||||
|
|
||||||
|
|
||||||
self.subscription = self.create_subscription(
|
self.subscription = self.create_subscription(
|
||||||
JointState,
|
JointState,
|
||||||
'/joint_states',
|
'/joint_states',
|
||||||
self.joint_states_callback,
|
self.joint_states_callback,
|
||||||
1
|
1
|
||||||
)
|
)
|
||||||
|
|
||||||
self.subscription = self.create_subscription(
|
self.subscription = self.create_subscription(
|
||||||
Log,
|
Log,
|
||||||
'/rosout',
|
'/rosout',
|
||||||
self.log_callback,
|
self.log_callback,
|
||||||
100
|
100
|
||||||
)
|
)
|
||||||
|
|
||||||
# Store received joint positions
|
# Store received joint positions
|
||||||
self.current_joint_positions = None
|
self.current_joint_positions = None
|
||||||
self.joint_names = joint_names
|
self.joint_names = config["robot"]["joint_names"]
|
||||||
self.joint_velocity_limits = joint_velocity_limits
|
self.joint_velocity_limits = config["robot"]["joint_velocity_limits"]
|
||||||
self.cost_mask = cost_mask
|
self.cost_mask = config["robot"]["cost_mask"]
|
||||||
|
self.trajectory_topic_name = config["robot"]["trajectory_topic_name"]
|
||||||
|
self.hz = config["robot"]["hz"]
|
||||||
|
self.state_ip = config["network"]["state_ip"]
|
||||||
|
self.state_port = config["network"]["state_port"]
|
||||||
|
self.log_ip = config["network"]["log_ip"]
|
||||||
|
self.log_port = config["network"]["log_port"]
|
||||||
|
self.commands_port = config["network"]["commands_port"]
|
||||||
|
float_or_none = lambda v: None if "None" in v else float(v)
|
||||||
|
self.x_limits = list(map(float_or_none, config["robot"]["x_limits"]))
|
||||||
|
self.y_limits = list(map(float_or_none, config["robot"]["y_limits"]))
|
||||||
|
self.z_limits = list(map(float_or_none, config["robot"]["z_limits"]))
|
||||||
|
self.x_limits_workspace = list(map(float_or_none, config["robot"]["x_limits_workspace"]))
|
||||||
|
self.y_limits_workspace = list(map(float_or_none, config["robot"]["y_limits_workspace"]))
|
||||||
|
self.z_limits_workspace = list(map(float_or_none, config["robot"]["z_limits_workspace"]))
|
||||||
self.robot = robot
|
self.robot = robot
|
||||||
self.desired = None
|
self.desired = None
|
||||||
self.previous_desired = None
|
self.previous_desired = None
|
||||||
@ -67,269 +66,17 @@ class OSC_ROS2_interface(Node):
|
|||||||
}
|
}
|
||||||
self.speed_scaling = 0.2
|
self.speed_scaling = 0.2
|
||||||
self.new = False
|
self.new = False
|
||||||
self.n_joints = len(joint_names)
|
self.n_joints = len(self.joint_names)
|
||||||
|
|
||||||
if robot:
|
|
||||||
while True:
|
|
||||||
print('+-' * 50)
|
|
||||||
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
|
|
||||||
if set_limits == 'y':
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
|
|
||||||
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
|
|
||||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
|
|
||||||
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
|
|
||||||
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
|
|
||||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
print(f"Current limits:")
|
|
||||||
print(f"x: {self.x_limits}")
|
|
||||||
print(f"y: {self.y_limits}")
|
|
||||||
print(f"z: {self.z_limits}")
|
|
||||||
con = True
|
|
||||||
while con:
|
|
||||||
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
|
|
||||||
if confirm == 'y':
|
|
||||||
break
|
|
||||||
elif confirm == 'n':
|
|
||||||
print("Please re-enter the limits.")
|
|
||||||
con = False
|
|
||||||
else:
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
if con: break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter numeric values only.")
|
|
||||||
break
|
|
||||||
elif set_limits == 'n':
|
|
||||||
self.x_limits = [None, None]
|
|
||||||
self.y_limits = [None, None]
|
|
||||||
self.z_limits = [None, None]
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
print('+-' * 50)
|
|
||||||
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
|
|
||||||
if set_limits == 'y':
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
self.x_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
self.y_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
self.z_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
|
||||||
|
|
||||||
if len(self.x_limits_workspace) != 2 or len(self.y_limits_workspace) != 2 or len(self.z_limits_workspace) != 2:
|
|
||||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if (self.x_limits_workspace[0] is not None and self.x_limits_workspace[1] is not None and self.x_limits_workspace[0] >= self.x_limits_workspace[1]) or \
|
|
||||||
(self.y_limits_workspace[0] is not None and self.y_limits_workspace[1] is not None and self.y_limits_workspace[0] >= self.y_limits_workspace[1]) or \
|
|
||||||
(self.z_limits_workspace[0] is not None and self.z_limits_workspace[1] is not None and self.z_limits_workspace[0] >= self.z_limits_workspace[1]):
|
|
||||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
print(f"Current limits:")
|
|
||||||
print(f"x: {self.x_limits_workspace}")
|
|
||||||
print(f"y: {self.y_limits_workspace}")
|
|
||||||
print(f"z: {self.z_limits_workspace}")
|
|
||||||
con = True
|
|
||||||
while con:
|
|
||||||
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
|
||||||
if confirm == 'y':
|
|
||||||
break
|
|
||||||
elif confirm == 'n':
|
|
||||||
print("Please re-enter the limits.")
|
|
||||||
con = False
|
|
||||||
else:
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
if con: break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter numeric values only.")
|
|
||||||
break
|
|
||||||
elif set_limits == 'n':
|
|
||||||
self.x_limits_workspace = [None, None]
|
|
||||||
self.y_limits_workspace = [None, None]
|
|
||||||
self.z_limits_workspace = [None, None]
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
# Ask the user if they want to set new joint limits
|
|
||||||
|
|
||||||
# Ask the user if they want to set new joint limits
|
|
||||||
while True:
|
|
||||||
print('+-'*50)
|
|
||||||
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
|
||||||
if update_limits == 'y':
|
|
||||||
for i in range(len(self.joint_names)):
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
lim = self.robot.qlim.copy()
|
|
||||||
# Find the link corresponding to the joint name
|
|
||||||
print("-" * 50)
|
|
||||||
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
|
||||||
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
|
||||||
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
|
||||||
|
|
||||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
|
||||||
print("Invalid input. Lower limit must be less than upper limit.")
|
|
||||||
continue
|
|
||||||
lower_limit = float(lower_limit) if lower_limit!="" else None
|
|
||||||
upper_limit = float(upper_limit) if upper_limit!="" else None
|
|
||||||
if lower_limit!=None:
|
|
||||||
if lower_limit<lim[0][i]:
|
|
||||||
while True:
|
|
||||||
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
|
|
||||||
if sure == 'y':
|
|
||||||
lim[0][i] = float(lower_limit)
|
|
||||||
break
|
|
||||||
elif sure == 'n':
|
|
||||||
print("Lower limit not changed.")
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
else: lim[0][i] = float(lower_limit)
|
|
||||||
if upper_limit!=None:
|
|
||||||
if upper_limit>lim[1][i]:
|
|
||||||
while True:
|
|
||||||
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
|
|
||||||
if sure == 'y':
|
|
||||||
lim[1][i] = float(upper_limit)
|
|
||||||
break
|
|
||||||
elif sure == 'n':
|
|
||||||
print("Upper limit not changed.")
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
else: lim[1][i] = float(upper_limit)
|
|
||||||
self.robot.qlim = lim
|
|
||||||
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
|
||||||
print("-" * 50)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
|
||||||
break
|
|
||||||
if update_limits == 'n':
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
'''
|
|
||||||
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
|
||||||
if use_link_mask == 'y':
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
'''
|
|
||||||
else:
|
|
||||||
while True:
|
|
||||||
print('+-' * 50)
|
|
||||||
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
|
|
||||||
if update_limits == 'y':
|
|
||||||
self.joint_lim = [[None]*self.n_joints,[None]*self.n_joints]
|
|
||||||
for i, joint in enumerate(self.joint_names):
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print("-" * 50)
|
|
||||||
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
|
|
||||||
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
|
|
||||||
|
|
||||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
|
||||||
print('--' * 50)
|
|
||||||
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
|
|
||||||
continue
|
|
||||||
self.joint_lim[0][i] = float(lower_limit) if lower_limit!="" else None
|
|
||||||
self.joint_lim[1][i] = float(upper_limit) if upper_limit!="" else None
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
|
||||||
print(f'New limits for joint:\n lower: {self.joint_lim[0]}\n upper: {self.joint_lim[1]}')
|
|
||||||
break
|
|
||||||
elif update_limits == 'n':
|
|
||||||
self.joint_lim = None
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('+-' * 50)
|
|
||||||
self.trajectory_topic_name = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
|
||||||
if self.trajectory_topic_name == "":
|
|
||||||
self.trajectory_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
|
|
||||||
break
|
|
||||||
elif self.trajectory_topic_name.startswith("/"):
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("Invalid topic name. A valid topic name should start with '/'.")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"An error occurred: {e}")
|
|
||||||
|
|
||||||
# ROS2 Publisher
|
# ROS2 Publisher
|
||||||
self.publisher = self.create_publisher(
|
self.publisher = self.create_publisher(
|
||||||
JointTrajectory,
|
JointTrajectory,
|
||||||
self.trajectory_topic_name,
|
self.trajectory_topic_name,
|
||||||
1
|
1
|
||||||
)
|
)
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('+-' * 50)
|
|
||||||
log_ip = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
|
||||||
if log_ip == "":
|
|
||||||
log_ip = "127.0.0.1"
|
|
||||||
print('--' * 50)
|
|
||||||
log_port = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
|
||||||
if log_port == "":
|
|
||||||
log_port = 5005
|
|
||||||
else:
|
|
||||||
log_port = int(log_port)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter a valid IP address.")
|
|
||||||
continue
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('+-' * 50)
|
|
||||||
state_ip = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
|
||||||
if state_ip == "":
|
|
||||||
state_ip = "127.0.0.1"
|
|
||||||
print('--' * 50)
|
|
||||||
state_port = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
|
||||||
if state_port == "":
|
|
||||||
state_port = 7000
|
|
||||||
else:
|
|
||||||
state_port = int(state_port)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter a valid IP address.")
|
|
||||||
continue
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('+-' * 50)
|
|
||||||
commands_port = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
|
||||||
if commands_port == state_port:
|
|
||||||
print("The commands port must be different from the state port.")
|
|
||||||
continue
|
|
||||||
if commands_port == "":
|
|
||||||
commands_port = 8000
|
|
||||||
else:
|
|
||||||
commands_port = int(commands_port)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter a valid port.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
osc_startup()
|
osc_startup()
|
||||||
|
osc_udp_client(self.state_ip, self.state_port, "osc_client")
|
||||||
osc_udp_client(state_ip, state_port, "osc_client")
|
osc_udp_client(self.log_ip, self.log_port, "osc_log_client")
|
||||||
|
osc_udp_server('0.0.0.0', self.commands_port, "osc_server")
|
||||||
osc_udp_client(log_ip, log_port, "osc_log_client")
|
|
||||||
|
|
||||||
osc_udp_server('0.0.0.0', commands_port, "osc_server")
|
|
||||||
|
|
||||||
# Register OSC handler
|
# Register OSC handler
|
||||||
osc_method("/joint_positions", self.joint_positions_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
osc_method("/joint_positions", self.joint_positions_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||||
osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_ADDRESS+osm.OSCARG_DATAUNPACK)
|
osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_ADDRESS+osm.OSCARG_DATAUNPACK)
|
||||||
@ -337,39 +84,10 @@ class OSC_ROS2_interface(Node):
|
|||||||
osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||||
osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||||
osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||||
print('--' * 50)
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('+-' * 50)
|
|
||||||
self.hz = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
|
||||||
if self.hz == "":
|
|
||||||
self.hz = 100
|
|
||||||
else:
|
|
||||||
self.hz = float(self.hz)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter a valid number.")
|
|
||||||
continue
|
|
||||||
print()
|
|
||||||
print('=-=' * 50)
|
|
||||||
print()
|
|
||||||
print(f'Sending joint states to {state_ip}:{state_port}')
|
|
||||||
print()
|
|
||||||
print('=-=' * 50)
|
|
||||||
print()
|
|
||||||
print(f'Sending log messages to {log_ip}:{log_port}')
|
|
||||||
print()
|
|
||||||
print('=-=' * 50)
|
|
||||||
print()
|
|
||||||
print(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
|
||||||
print()
|
|
||||||
print('=-=' * 50)
|
|
||||||
print()
|
|
||||||
|
|
||||||
self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}")
|
self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}")
|
||||||
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{self.commands_port}')
|
||||||
self.get_logger().info(f'Sending joint states to {state_ip}:{state_port}')
|
self.get_logger().info(f'Sending joint states to {self.state_ip}:{self.state_port}')
|
||||||
self.get_logger().info(f'Sending log messages to {log_ip}:{log_port}')
|
self.get_logger().info(f'Sending log messages to {self.log_ip}:{self.log_port}')
|
||||||
self.create_timer(1/self.hz, self.update_position) # Timer to update the position
|
self.create_timer(1/self.hz, self.update_position) # Timer to update the position
|
||||||
self.create_timer(3, self.reset_prev) # reset the previous desired position
|
self.create_timer(3, self.reset_prev) # reset the previous desired position
|
||||||
|
|
||||||
@ -393,17 +111,16 @@ class OSC_ROS2_interface(Node):
|
|||||||
|
|
||||||
def joint_trajectory_handler(self, *args):
|
def joint_trajectory_handler(self, *args):
|
||||||
try:
|
try:
|
||||||
if len(args[0]) == 6:
|
if len(args[0]) == 6:
|
||||||
points = [[float(j) for j in i] for i in args]
|
points = [[float(j) for j in i] for i in args]
|
||||||
elif len(args[0]) >= 7:
|
elif len(args[0]) >= 7:
|
||||||
points = [[float(j) for j in i[:6]] for i in args]
|
points = [[float(j) for j in i[:6]] for i in args]
|
||||||
self.get_logger().warn(f"joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.")
|
self.get_logger().warn("joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.")
|
||||||
else:
|
else:
|
||||||
self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.")
|
self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.")
|
||||||
return
|
return
|
||||||
|
self.desired = ["joint_trajectory"] + points
|
||||||
self.desired = ["joint_trajectory"] + points
|
self.new = True
|
||||||
self.new = True
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().fatal(f"joint_trajectory_handler: {e}")
|
self.get_logger().fatal(f"joint_trajectory_handler: {e}")
|
||||||
|
|
||||||
@ -411,7 +128,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
"""Handles incoming OSC messages for joint positions."""
|
"""Handles incoming OSC messages for joint positions."""
|
||||||
try:
|
try:
|
||||||
joint_name = address.split("/")[-1]
|
joint_name = address.split("/")[-1]
|
||||||
|
|
||||||
if joint_name in self.joint_names:
|
if joint_name in self.joint_names:
|
||||||
if len(args) == 1:
|
if len(args) == 1:
|
||||||
position = float(args[0])
|
position = float(args[0])
|
||||||
@ -478,7 +194,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
self.get_logger().warn(f"joint_position_handler: Joint '{joint_name}' not found in the robot model.")
|
self.get_logger().warn(f"joint_position_handler: Joint '{joint_name}' not found in the robot model.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().fatal(f"joint_position_handler: {e}")
|
self.get_logger().fatal(f"joint_position_handler: {e}")
|
||||||
|
|
||||||
|
|
||||||
def cartesian_trajectory_handler(self, *args):
|
def cartesian_trajectory_handler(self, *args):
|
||||||
"""Handles incoming OSC messages for cartesian trajectory."""
|
"""Handles incoming OSC messages for cartesian trajectory."""
|
||||||
@ -488,11 +203,10 @@ class OSC_ROS2_interface(Node):
|
|||||||
points = [[float(j) for j in i] for i in args]
|
points = [[float(j) for j in i] for i in args]
|
||||||
elif len(args[0]) >= 7:
|
elif len(args[0]) >= 7:
|
||||||
points = [[float(j) for j in i[:6]] for i in args]
|
points = [[float(j) for j in i[:6]] for i in args]
|
||||||
self.get_logger().warn(f"cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.")
|
self.get_logger().warn("cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.")
|
||||||
else:
|
else:
|
||||||
self.get_logger().warn(f"cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.")
|
self.get_logger().warn("cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.")
|
||||||
return
|
return
|
||||||
|
|
||||||
self.desired = ["cartesian_trajectory"] + points
|
self.desired = ["cartesian_trajectory"] + points
|
||||||
self.new = True
|
self.new = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -511,7 +225,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().warn(f"joint_positions_handler: Invalid number of arguments for joint positions. Expected {len(self.joint_names)} ([q0, q1, q2, ... q{len(self.joint_names)}]) or {len(self.joint_names)+1} ([q0, q1, q2, ... q{len(self.joint_names)}, duration]), but got {len(args)}.")
|
self.get_logger().warn(f"joint_positions_handler: Invalid number of arguments for joint positions. Expected {len(self.joint_names)} ([q0, q1, q2, ... q{len(self.joint_names)}]) or {len(self.joint_names)+1} ([q0, q1, q2, ... q{len(self.joint_names)}, duration]), but got {len(args)}.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check if joint positions exceed limits
|
# Check if joint positions exceed limits
|
||||||
if self.robot:
|
if self.robot:
|
||||||
for i, position in enumerate(desired_joint_positions[:len(self.joint_names)]): # Exclude duration if present
|
for i, position in enumerate(desired_joint_positions[:len(self.joint_names)]): # Exclude duration if present
|
||||||
@ -540,7 +253,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
self.get_logger().warn(
|
self.get_logger().warn(
|
||||||
f"joint_positions_handler: Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.joint_lim[1][i]}."
|
f"joint_positions_handler: Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.joint_lim[1][i]}."
|
||||||
)
|
)
|
||||||
|
|
||||||
self.desired = ["joint_positions"] + desired_joint_positions
|
self.desired = ["joint_positions"] + desired_joint_positions
|
||||||
self.new = True
|
self.new = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -559,7 +271,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().warn(f"tcp_coordinates_handler: Invalid number of arguments for TCP coordinates. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args)}.")
|
self.get_logger().warn(f"tcp_coordinates_handler: Invalid number of arguments for TCP coordinates. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args)}.")
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.x_limits[0] is not None:
|
if self.x_limits[0] is not None:
|
||||||
x = max(self.x_limits[0], x)
|
x = max(self.x_limits[0], x)
|
||||||
if self.x_limits[1] is not None:
|
if self.x_limits[1] is not None:
|
||||||
@ -572,13 +283,11 @@ class OSC_ROS2_interface(Node):
|
|||||||
z = max(self.z_limits[0], z)
|
z = max(self.z_limits[0], z)
|
||||||
if self.z_limits[1] is not None:
|
if self.z_limits[1] is not None:
|
||||||
z = min(self.z_limits[1], z)
|
z = min(self.z_limits[1], z)
|
||||||
|
|
||||||
if x != args[0] or y != args[1] or z != args[2]:
|
if x != args[0] or y != args[1] or z != args[2]:
|
||||||
self.get_logger().warn(
|
self.get_logger().warn(
|
||||||
f"tcp_coordinates_handler: Desired joint positions adjusted to fit within limits: "
|
f"tcp_coordinates_handler: Desired joint positions adjusted to fit within limits: "
|
||||||
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
|
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.desired = ["tcp_coordinates", x, y, z, r, p, yaw, duration]
|
self.desired = ["tcp_coordinates", x, y, z, r, p, yaw, duration]
|
||||||
self.new = True
|
self.new = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -587,23 +296,21 @@ class OSC_ROS2_interface(Node):
|
|||||||
self.get_logger().warn("tcp_coordinates_handler: No robot model provided. Cannot handle TCP coordinates.")
|
self.get_logger().warn("tcp_coordinates_handler: No robot model provided. Cannot handle TCP coordinates.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def joint_states_callback(self, msg: JointState):
|
def joint_states_callback(self, msg: JointState):
|
||||||
"""Callback function to handle incoming joint states."""
|
"""Callback function to handle incoming joint states."""
|
||||||
try:
|
try:
|
||||||
msg_time = oscbuildparse.OSCMessage(f"/time", ',s', [str(time.time())])
|
msg_time = oscbuildparse.OSCMessage("/time", ',s', [str(time.time())])
|
||||||
osc_send(msg_time, "osc_client")
|
osc_send(msg_time, "osc_client")
|
||||||
if not(self.joint_names): self.joint_names = msg.name
|
if not(self.joint_names):
|
||||||
|
self.joint_names = msg.name
|
||||||
joint_position_dict = dict(zip(msg.name, msg.position))
|
joint_position_dict = dict(zip(msg.name, msg.position))
|
||||||
self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names]
|
self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||||
joint_position_dict = dict(zip(msg.name, msg.velocity))
|
joint_position_dict = dict(zip(msg.name, msg.velocity))
|
||||||
self.current_joint_velocities = [float(joint_position_dict[name]) for name in self.joint_names]
|
self.current_joint_velocities = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||||
|
|
||||||
if self.robot:
|
if self.robot:
|
||||||
tcp_position = self.robot.fkine(self.current_joint_positions).t
|
tcp_position = self.robot.fkine(self.current_joint_positions).t
|
||||||
tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy()
|
tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy()
|
||||||
|
msg_tcp = oscbuildparse.OSCMessage("/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
|
||||||
msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
|
|
||||||
#msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
|
#msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
|
||||||
#msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
|
#msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
|
||||||
#msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
|
#msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
|
||||||
@ -613,21 +320,18 @@ class OSC_ROS2_interface(Node):
|
|||||||
#bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw])
|
#bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw])
|
||||||
#osc_send(bun, "osc_client")
|
#osc_send(bun, "osc_client")
|
||||||
osc_send(msg_tcp, "osc_client")
|
osc_send(msg_tcp, "osc_client")
|
||||||
|
msg_position = oscbuildparse.OSCMessage("/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
|
||||||
msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
|
msg_velocity = oscbuildparse.OSCMessage("/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
|
||||||
msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
|
msg_effort = oscbuildparse.OSCMessage("/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort])
|
||||||
msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort])
|
msg_name = oscbuildparse.OSCMessage("/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name])
|
||||||
msg_name = oscbuildparse.OSCMessage(f"/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name])
|
|
||||||
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort])
|
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort])
|
||||||
osc_send(bun, "osc_client")
|
osc_send(bun, "osc_client")
|
||||||
|
|
||||||
#for i, name in enumerate(msg.name):
|
#for i, name in enumerate(msg.name):
|
||||||
# msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]])
|
# msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]])
|
||||||
# msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]])
|
# msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]])
|
||||||
# msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]])
|
# msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]])
|
||||||
# bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort])
|
# bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort])
|
||||||
# osc_send(bun, "osc_client")
|
# osc_send(bun, "osc_client")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().fatal(f"joint_states_callback: {e}")
|
self.get_logger().fatal(f"joint_states_callback: {e}")
|
||||||
|
|
||||||
@ -672,21 +376,17 @@ class OSC_ROS2_interface(Node):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def trapezoidal_timestamps(self, num_points,total_duration, flat_ratio = 0.3):
|
def trapezoidal_timestamps(self, num_points,total_duration, flat_ratio = 0.3):
|
||||||
|
|
||||||
if num_points == 2:
|
if num_points == 2:
|
||||||
return [0, total_duration]
|
return [0, total_duration]
|
||||||
n = int(num_points*(1-flat_ratio)/2)
|
n = int(num_points*(1-flat_ratio)/2)
|
||||||
start = np.cos(np.linspace(0, np.pi, n))+2
|
start = np.cos(np.linspace(0, np.pi, n))+2
|
||||||
end = np.cos(np.linspace(-np.pi, 0, n))+2
|
end = np.cos(np.linspace(-np.pi, 0, n))+2
|
||||||
flat = np.ones(num_points-2*n)
|
flat = np.ones(num_points-2*n)
|
||||||
|
|
||||||
timestamps = np.concatenate((start, flat, end))
|
timestamps = np.concatenate((start, flat, end))
|
||||||
timestamps *= total_duration / timestamps.sum()
|
timestamps *= total_duration / timestamps.sum()
|
||||||
timestamps = np.cumsum(timestamps)
|
timestamps = np.cumsum(timestamps)
|
||||||
|
|
||||||
return timestamps.tolist()
|
return timestamps.tolist()
|
||||||
|
|
||||||
|
|
||||||
def send_tcp_coordinates(self):
|
def send_tcp_coordinates(self):
|
||||||
"""Send the desired TCP coordinates to the robot."""
|
"""Send the desired TCP coordinates to the robot."""
|
||||||
try:
|
try:
|
||||||
@ -694,7 +394,7 @@ class OSC_ROS2_interface(Node):
|
|||||||
msg = JointTrajectory()
|
msg = JointTrajectory()
|
||||||
msg.joint_names = self.joint_names
|
msg.joint_names = self.joint_names
|
||||||
steps_per_m = 100
|
steps_per_m = 100
|
||||||
if self.previous_desired == None:
|
if self.previous_desired is None:
|
||||||
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
|
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
|
||||||
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
|
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
|
||||||
else:
|
else:
|
||||||
@ -703,7 +403,8 @@ class OSC_ROS2_interface(Node):
|
|||||||
x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7]
|
x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7]
|
||||||
self.previous_desired = self.desired
|
self.previous_desired = self.desired
|
||||||
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
|
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
|
||||||
if steps < 2: steps = 2
|
if steps < 2:
|
||||||
|
steps = 2
|
||||||
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
|
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
|
||||||
'''if self.previous_desired:
|
'''if self.previous_desired:
|
||||||
[x,y,z] = self.previous_desired[1:4]
|
[x,y,z] = self.previous_desired[1:4]
|
||||||
@ -746,16 +447,13 @@ class OSC_ROS2_interface(Node):
|
|||||||
|
|
||||||
# Compose SE3 transform
|
# Compose SE3 transform
|
||||||
cart_traj.append(sm.SE3(pos_interp) * q_interp.SE3())'''
|
cart_traj.append(sm.SE3(pos_interp) * q_interp.SE3())'''
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if self.desired[-1]:
|
if self.desired[-1]:
|
||||||
timestamps = self.trapezoidal_timestamps(steps, self.desired[-1], 0.8)
|
timestamps = self.trapezoidal_timestamps(steps, self.desired[-1], 0.8)
|
||||||
for j in range(steps):
|
for j in range(steps):
|
||||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||||
if sol[1] == 1:
|
if sol[1] == 1:
|
||||||
fowards = self.robot.fkine_all(sol[0])
|
fowards = self.robot.fkine_all(sol[0])
|
||||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False)
|
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
|
||||||
if np.any(out_of_bounds):
|
if np.any(out_of_bounds):
|
||||||
#print(fowards.t)
|
#print(fowards.t)
|
||||||
#indices = np.where(out_of_bounds)[0]
|
#indices = np.where(out_of_bounds)[0]
|
||||||
@ -790,9 +488,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
msg.header.stamp = self.get_clock().now().to_msg()
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
self.publisher.publish(msg)
|
self.publisher.publish(msg)
|
||||||
self.previous_desired = self.desired
|
self.previous_desired = self.desired
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
prev_duration = 0
|
prev_duration = 0
|
||||||
'''
|
'''
|
||||||
@ -807,7 +502,7 @@ class OSC_ROS2_interface(Node):
|
|||||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||||
if sol[1] == 1:
|
if sol[1] == 1:
|
||||||
fowards = self.robot.fkine_all(sol[0])
|
fowards = self.robot.fkine_all(sol[0])
|
||||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False)
|
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
|
||||||
if np.any(out_of_bounds):
|
if np.any(out_of_bounds):
|
||||||
#print(fowards.t)
|
#print(fowards.t)
|
||||||
#indices = np.where(out_of_bounds)[0]
|
#indices = np.where(out_of_bounds)[0]
|
||||||
@ -852,7 +547,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
self.get_logger().fatal(f"send_tcp_coordinates: {e}")
|
self.get_logger().fatal(f"send_tcp_coordinates: {e}")
|
||||||
|
|
||||||
def send_joint_trajectory(self):
|
def send_joint_trajectory(self):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.new = False
|
self.new = False
|
||||||
viapoints = np.array([i for i in self.desired[1:]])
|
viapoints = np.array([i for i in self.desired[1:]])
|
||||||
@ -873,7 +567,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
print(f'Error in joint_angles_handler: {e}')
|
print(f'Error in joint_angles_handler: {e}')
|
||||||
|
|
||||||
def send_cartesian_trajectory(self):
|
def send_cartesian_trajectory(self):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.new = False
|
self.new = False
|
||||||
viapoints = np.array([i[:6] for i in self.desired[1:]])
|
viapoints = np.array([i[:6] for i in self.desired[1:]])
|
||||||
@ -898,14 +591,14 @@ class OSC_ROS2_interface(Node):
|
|||||||
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
|
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
|
||||||
msg.points.append(point)
|
msg.points.append(point)
|
||||||
prev_sol = list(sol[0])
|
prev_sol = list(sol[0])
|
||||||
else: self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!")
|
else:
|
||||||
|
self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!")
|
||||||
msg.header.stamp = self.get_clock().now().to_msg()
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
msg.points = msg.points[::n]
|
msg.points = msg.points[::n]
|
||||||
self.publisher.publish(msg)
|
self.publisher.publish(msg)
|
||||||
self.previous_desired = None
|
self.previous_desired = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'Error in joint_angles_handler: {e}')
|
print(f'Error in joint_angles_handler: {e}')
|
||||||
|
|
||||||
self.previous_desired = None
|
self.previous_desired = None
|
||||||
|
|
||||||
def update_position(self):
|
def update_position(self):
|
||||||
@ -913,7 +606,6 @@ class OSC_ROS2_interface(Node):
|
|||||||
try:
|
try:
|
||||||
if self.desired is None or not(self.new):
|
if self.desired is None or not(self.new):
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.desired[0] == "joint_positions":
|
if self.desired[0] == "joint_positions":
|
||||||
self.new = False
|
self.new = False
|
||||||
self.send_joint_positions()
|
self.send_joint_positions()
|
||||||
@ -933,35 +625,26 @@ class OSC_ROS2_interface(Node):
|
|||||||
else:
|
else:
|
||||||
self.get_logger().warn(f"update_position: Unknown desired type '{self.desired[0]}'.")
|
self.get_logger().warn(f"update_position: Unknown desired type '{self.desired[0]}'.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().fatal(f'update_position: {e}')
|
self.get_logger().fatal(f'update_position: {e}')
|
||||||
|
|
||||||
|
|
||||||
def clean_log_string(self, s):
|
def clean_log_string(self, s):
|
||||||
|
|
||||||
s = str(s)
|
s = str(s)
|
||||||
|
|
||||||
# Remove ANSI escape sequences (e.g., \x1b[31m)
|
# Remove ANSI escape sequences (e.g., \x1b[31m)
|
||||||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||||||
s = ansi_escape.sub('', s)
|
s = ansi_escape.sub('', s)
|
||||||
|
|
||||||
# Replace tabs/newlines with spaces
|
# Replace tabs/newlines with spaces
|
||||||
s = s.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ').replace("'", ' '). replace('"', ' ').replace('`', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ')
|
s = s.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ').replace("'", ' '). replace('"', ' ').replace('`', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ')
|
||||||
|
|
||||||
# Strip leading/trailing whitespace
|
# Strip leading/trailing whitespace
|
||||||
s = s.strip()
|
s = s.strip()
|
||||||
|
|
||||||
# Optionally enforce ASCII only (replace non-ASCII chars with '?')
|
# Optionally enforce ASCII only (replace non-ASCII chars with '?')
|
||||||
s = s.encode('ascii', 'replace').decode('ascii')
|
s = s.encode('ascii', 'replace').decode('ascii')
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
def log_callback(self, msg: Log):
|
def log_callback(self, msg: Log):
|
||||||
"""Callback function to handle incoming log messages."""
|
"""Callback function to handle incoming log messages."""
|
||||||
|
|
||||||
# Send the log message as an OSC message
|
# Send the log message as an OSC message
|
||||||
msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)])
|
msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)])
|
||||||
osc_send(msg_log, "osc_log_client")
|
osc_send(msg_log, "osc_log_client")
|
||||||
@ -969,122 +652,16 @@ class OSC_ROS2_interface(Node):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
||||||
|
parser = argparse.ArgumentParser(description="An interface between ROS2 and OSC for robotic arm manipulators", usage="ros2 run osc_ros2 interface [-c CONFIG.TOML]")
|
||||||
|
parser.add_argument("-c", "--config", type=argparse.FileType("r"), help="A robot TOML config, if the config is provided then the interface will not show the iteractive input")
|
||||||
|
args = parser.parse_args()
|
||||||
rclpy.init()
|
rclpy.init()
|
||||||
while True:
|
if not args.config:
|
||||||
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
|
config, robot = interactive_input()
|
||||||
if use_urdf == 'y':
|
else:
|
||||||
while True:
|
config = toml.load(args.config)
|
||||||
robot_urdf = input("Enter the absolute path to the URDF file: ")
|
robot = rtb.ERobot.URDF(config["robot"]["urdf"])
|
||||||
if os.path.isfile(robot_urdf):
|
node = OSC_ROS2_interface(robot, config)
|
||||||
if not robot_urdf.endswith('.urdf'):
|
|
||||||
print("The file is not a URDF file. Please enter a valid URDF file.")
|
|
||||||
continue
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("Invalid path. Please enter a valid path to the URDF file.")
|
|
||||||
tree = ET.parse(robot_urdf)
|
|
||||||
root = tree.getroot()
|
|
||||||
robot = rtb.ERobot.URDF(robot_urdf)
|
|
||||||
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
|
||||||
print(robot)
|
|
||||||
joint_velocity_limits = {}
|
|
||||||
|
|
||||||
# Iterate over all joints in the URDF
|
|
||||||
for joint in root.findall('.//joint'):
|
|
||||||
joint_name = joint.get('name') # Get the name of the joint
|
|
||||||
|
|
||||||
# Look for the <limit> tag under each joint
|
|
||||||
limit = joint.find('limit')
|
|
||||||
|
|
||||||
if limit is not None:
|
|
||||||
# Extract the velocity limit (if it exists)
|
|
||||||
velocity_limit = limit.get('velocity')
|
|
||||||
|
|
||||||
if velocity_limit is not None:
|
|
||||||
joint_velocity_limits[joint_name] = float(velocity_limit)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('-+'*50)
|
|
||||||
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
|
|
||||||
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
|
|
||||||
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
|
|
||||||
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter integers only.")
|
|
||||||
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}")
|
|
||||||
break
|
|
||||||
elif use_urdf == 'n':
|
|
||||||
node = JointNameListener()
|
|
||||||
print("Wainting 10 sec for JointState messages to extract joint names...")
|
|
||||||
rclpy.spin_once(node)
|
|
||||||
counter = 0
|
|
||||||
while not(node.joint_names):
|
|
||||||
if counter > 100:
|
|
||||||
joint_names = None
|
|
||||||
break
|
|
||||||
counter+=1
|
|
||||||
time.sleep(0.1)
|
|
||||||
rclpy.spin_once(node)
|
|
||||||
joint_names = node.joint_names
|
|
||||||
node.destroy_node()
|
|
||||||
if joint_names:
|
|
||||||
correct_names = input("The following joint names were found:\n" + "\n".join(joint_names) + "\nAre these correct? (y/n): ").strip().lower()
|
|
||||||
while True:
|
|
||||||
if correct_names == 'y':
|
|
||||||
break
|
|
||||||
elif correct_names == 'n':
|
|
||||||
joint_names = None
|
|
||||||
break
|
|
||||||
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
if not(joint_names):
|
|
||||||
print("Please enter the joint names manually.")
|
|
||||||
while True:
|
|
||||||
joint_names = []
|
|
||||||
print('-+'*50)
|
|
||||||
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
|
|
||||||
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
|
|
||||||
while True:
|
|
||||||
print('-'*50)
|
|
||||||
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
|
|
||||||
if joint_name.lower() == 'done':
|
|
||||||
break
|
|
||||||
if joint_name:
|
|
||||||
joint_names.append(joint_name)
|
|
||||||
print('-+'*50)
|
|
||||||
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {joint_names}. (y/n)?: ").strip()
|
|
||||||
if correct_names.lower() == 'y':
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("Please re-enter the joint names.")
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
joint_velocity_limits = {}
|
|
||||||
for name in joint_names:
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
print('--'*50)
|
|
||||||
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
|
|
||||||
if limit == "":
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
joint_velocity_limits[name] = float(limit)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter a numeric value or press Enter to skip.")
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid input. Please enter numeric values or leave blank to skip.")
|
|
||||||
robot = None
|
|
||||||
cost_mask = None
|
|
||||||
break
|
|
||||||
print("Invalid input. Please enter 'y' or 'n'.")
|
|
||||||
|
|
||||||
|
|
||||||
node = OSC_ROS2_interface(joint_names, joint_velocity_limits, robot, cost_mask)
|
|
||||||
|
|
||||||
# Run ROS 2 spin, and osc_process will be handled by the timer
|
# Run ROS 2 spin, and osc_process will be handled by the timer
|
||||||
try:
|
try:
|
||||||
|
27
workspace/ur10e.toml
Normal file
27
workspace/ur10e.toml
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
[robot]
|
||||||
|
urdf = "/workspace/ur10e.urdf"
|
||||||
|
joint_names = [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",]
|
||||||
|
cost_mask_string = "111111"
|
||||||
|
x_limits = [ "None", "None",]
|
||||||
|
y_limits = [ "None", "None",]
|
||||||
|
z_limits = [ "None", "None",]
|
||||||
|
x_limits_workspace = [ "None", "None",]
|
||||||
|
y_limits_workspace = [ "None", "None",]
|
||||||
|
z_limits_workspace = [ "None", "None",]
|
||||||
|
trajectory_topic_name = "/scaled_joint_trajectory_controller/joint_trajectory"
|
||||||
|
hz = 100
|
||||||
|
|
||||||
|
[network]
|
||||||
|
log_ip = "127.0.0.1"
|
||||||
|
log_port = 5005
|
||||||
|
state_ip = "127.0.0.1"
|
||||||
|
state_port = 7000
|
||||||
|
commands_port = 8000
|
||||||
|
|
||||||
|
[robot.joint_velocity_limits]
|
||||||
|
shoulder_pan_joint = 2.0943951023931953
|
||||||
|
shoulder_lift_joint = 2.0943951023931953
|
||||||
|
elbow_joint = 3.141592653589793
|
||||||
|
wrist_1_joint = 3.141592653589793
|
||||||
|
wrist_2_joint = 3.141592653589793
|
||||||
|
wrist_3_joint = 3.141592653589793
|
Loading…
Reference in New Issue
Block a user