Compare commits
11 Commits
b9434a5e7f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 6ea5c3783d | |||
| d93b4b8de1 | |||
| 70f78ada2b | |||
| cdec866f4f | |||
| ce3f00c3e2 | |||
| 6942ff044e | |||
| c665883185 | |||
|
|
4ab2ff3c3b | ||
|
|
6db7143bd5 | ||
|
|
b45371d39b | ||
|
|
b4abe1eec1 |
@@ -1,4 +1,3 @@
|
||||
|
||||
# OSC–ROS 2 Interface
|
||||
|
||||
## Description
|
||||
@@ -59,7 +58,7 @@ docker build -t ros2_humble:latest ROS2_humble
|
||||
After all the containers are successfully built and the bridge network is established we can now start both containers. On one terminal run the following command to start the URSim container
|
||||
|
||||
```bash
|
||||
docker run -it --name URSIM -p 6080:6080 -p8000:8000 -p7000:7000 -p5005:5005 -p50002:50002 -v./workspace:/workspace --network ros_ursim ursim:latest
|
||||
docker run --name URSIM -it -p 6080:6080 --network ros_ursim ursim:latest
|
||||
```
|
||||
|
||||
When successfully started this container will print its IP address in the terminal. Note this down:
|
||||
@@ -73,7 +72,7 @@ IP address of the simulator
|
||||
On another terminal run
|
||||
|
||||
```
|
||||
docker run -it --name ROS --network ros_ursim -p50002:50002 -p8000:8000 -p7000:7000 -p5005:5005 ros2_humble:latest
|
||||
docker run -it --name ROS --network ros_ursim -p50002-50100:50002-50100 -p5005-8000:5005-8000 -v ./workspace:/workspace ros2_humble:latest
|
||||
```
|
||||
Also note the IP address, using `hostname`
|
||||
|
||||
@@ -145,7 +144,7 @@ If everything worked, this should either move the robot or complain about it bei
|
||||
Now you should find three new folders in your directory: log, build and install. You can check with `ls`
|
||||
```bash
|
||||
workspace/
|
||||
├── src
|
||||
├── srcdocker run --name URSIM
|
||||
├── build
|
||||
├── install
|
||||
└── log
|
||||
|
||||
19
examples/log.py
Normal file
19
examples/log.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import socket
|
||||
from ssl import SOCK_STREAM
|
||||
from oscpy import *
|
||||
|
||||
UDP_IP = "172.18.0.1"
|
||||
UDP_PORT = 5005
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((UDP_IP, UDP_PORT))
|
||||
|
||||
while True:
|
||||
data, addr = sock.recvfrom(10*1024)
|
||||
if "#bundle" in data[:8].decode():
|
||||
b = Bundle(data)
|
||||
print(b)
|
||||
else:
|
||||
m = Message(data)
|
||||
print(m)
|
||||
|
||||
45
examples/move.py
Normal file
45
examples/move.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import socket
|
||||
import time
|
||||
from oscpy import *
|
||||
|
||||
|
||||
SEND_IP = "172.18.0.2"
|
||||
SEND_PORT = 8000
|
||||
RECV_IP = "172.18.0.1"
|
||||
RECV_PORT = 7000
|
||||
|
||||
pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
msg = Message("/joint_positions", [-1]*6).encode()
|
||||
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
|
||||
if n > 0:
|
||||
print(f"INFO: Sent {n} bytes")
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
msg = Message("/tcp_coordinates", [-1]*6).encode()
|
||||
n = pub.sendto(msg, (SEND_IP, SEND_PORT))
|
||||
if n > 0:
|
||||
print(f"INFO: Sent {n} bytes")
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
sub.bind((RECV_IP, RECV_PORT))
|
||||
joint_names = []
|
||||
found = False
|
||||
while not found:
|
||||
data, addr = sub.recvfrom(5*1024)
|
||||
if "#bundle" in data[:8].decode():
|
||||
b = Bundle().decode(data)
|
||||
for msg in b.msgs:
|
||||
if "/joint_state/name" in msg.address:
|
||||
joint_names = msg.args
|
||||
found = True
|
||||
|
||||
if joint_names:
|
||||
msg = Message(f"/joint_position/{joint_names[2]}", [ 0 ])
|
||||
n = pub.sendto(msg.encode(), (SEND_IP, SEND_PORT))
|
||||
if n > 0:
|
||||
print(f"INFO: Sent {n} bytes")
|
||||
|
||||
138
examples/oscpy/__init__.py
Normal file
138
examples/oscpy/__init__.py
Normal file
@@ -0,0 +1,138 @@
|
||||
import struct
|
||||
from typing import final, override
|
||||
|
||||
OSC_TYPES = str | int | float | bool | bytes
|
||||
|
||||
def get_string_size(data: bytes) -> int:
|
||||
n = data.find(b'\x00')
|
||||
n = n + 4 - n%4
|
||||
return n
|
||||
|
||||
def to_osc_string(s: str) -> bytes:
|
||||
n = len(s)
|
||||
b = struct.pack(f'{n}s{4 - n%4}x', s.encode())
|
||||
return b
|
||||
|
||||
def to_osc_blob(b: bytes) -> bytes:
|
||||
n = len(b)
|
||||
b = struct.pack(f'i{4 - n%4}p', n, b)
|
||||
return b
|
||||
|
||||
def parse_string(data: bytes) -> tuple[bytes, str]:
|
||||
n = get_string_size(data)
|
||||
values: tuple[bytes] = struct.unpack(f'>{n}s', data[:n])
|
||||
value = values[0].split(b"\x00", 1)
|
||||
return data[n:], value[0].decode()
|
||||
|
||||
def parse_float(data: bytes) -> tuple[bytes, float]:
|
||||
values: tuple[float] = struct.unpack('>f', data[:4])
|
||||
return data[4:], values[0]
|
||||
|
||||
def parse_int(data: bytes) -> tuple[bytes, int]:
|
||||
values: tuple[int] = struct.unpack('>i', data[:4])
|
||||
return data[4:], values[0]
|
||||
|
||||
def parse_blob(data: bytes) -> tuple[bytes, bytes]:
|
||||
n_values: tuple[int] = struct.unpack('>i', data[:4])
|
||||
n: int = n_values[0]
|
||||
values: tuple[bytes] = struct.unpack(f'>{n}p', data[4:4+n])
|
||||
return data[4+n:], values[0]
|
||||
|
||||
def parse_args(tt: str, data: bytes) -> list[OSC_TYPES]:
|
||||
tt = tt[1:]
|
||||
args: list[OSC_TYPES] = []
|
||||
for c in tt:
|
||||
match c:
|
||||
case 's':
|
||||
data, val = parse_string(data)
|
||||
args.append(val)
|
||||
case 'b':
|
||||
data, val = parse_blob(data)
|
||||
args.append(val)
|
||||
case 'f':
|
||||
data, val = parse_float(data)
|
||||
args.append(val)
|
||||
case 'i':
|
||||
data, val = parse_int(data)
|
||||
args.append(val)
|
||||
case _:
|
||||
print(f"[ERROR]: Got {c}")
|
||||
return args
|
||||
|
||||
def encode_args(args: list[OSC_TYPES]) -> bytes:
|
||||
encoded = b''
|
||||
for arg in args:
|
||||
match arg:
|
||||
case str(): encoded += to_osc_string(arg)
|
||||
case float(): encoded += struct.pack('>f', arg)
|
||||
case int(): encoded += struct.pack('>i', arg)
|
||||
case bytes(): encoded += to_osc_blob(arg)
|
||||
return encoded
|
||||
|
||||
def parse_type(arg: OSC_TYPES) -> str:
|
||||
match arg:
|
||||
case str(): return "s"
|
||||
case float(): return "f"
|
||||
case int(): return "i"
|
||||
case bytes(): return "b"
|
||||
|
||||
@final
|
||||
class Message:
|
||||
def __init__(self, address: str = "/", args: list[OSC_TYPES] = []):
|
||||
self.address: str = address
|
||||
self.type_tags = ","
|
||||
for arg in args:
|
||||
self.type_tags = self.type_tags + parse_type(arg)
|
||||
self.args: list[OSC_TYPES] = args
|
||||
|
||||
def decode(self, data: bytes):
|
||||
data, self.address = parse_string(data)
|
||||
data, self.type_tags = parse_string(data)
|
||||
self.args = parse_args(self.type_tags, data)
|
||||
return self
|
||||
|
||||
def encode(self) -> bytes:
|
||||
msg = to_osc_string(self.address)
|
||||
msg += to_osc_string(self.type_tags)
|
||||
msg += encode_args(self.args)
|
||||
return msg
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
return f"{self.address} [{self.type_tags}]: {self.args}"
|
||||
|
||||
@final
|
||||
class Bundle:
|
||||
def __init__(self, time_tag: float = 0.0, msgs: list[Message] = []):
|
||||
self.header = "#bundle"
|
||||
self.ttag = time_tag
|
||||
self.msgs = msgs
|
||||
|
||||
def decode(self, data: bytes):
|
||||
data, self.header = parse_string(data)
|
||||
data[4:]
|
||||
data, self.ttag = parse_float(data)
|
||||
data = data[4:]
|
||||
self.msgs: list[Message] = []
|
||||
while len(data) > 0:
|
||||
data, n = parse_int(data)
|
||||
msg_data = data[:n]
|
||||
data = data[n:]
|
||||
self.msgs.append(Message().decode(msg_data))
|
||||
return self
|
||||
|
||||
def encode(self) -> bytes:
|
||||
bundle = to_osc_string("#bundle")
|
||||
bundle += struct.pack('4xf4x', self.ttag)
|
||||
for msg in self.msgs:
|
||||
msg_data = msg.encode()
|
||||
bundle += struct.pack('i', len(msg_data))
|
||||
bundle += msg_data
|
||||
return bundle
|
||||
|
||||
@override
|
||||
def __str__(self) -> str:
|
||||
out = f"{self.header} ({self.ttag}):\n"
|
||||
for msg in self.msgs:
|
||||
out += f" - {msg}\n"
|
||||
return out
|
||||
27
examples/state.py
Normal file
27
examples/state.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import socket
|
||||
import struct
|
||||
from oscpy import *
|
||||
|
||||
UDP_IP = "172.18.0.1"
|
||||
UDP_PORT = 7000
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((UDP_IP, UDP_PORT))
|
||||
|
||||
|
||||
while True:
|
||||
data, addr = sock.recvfrom(10*1024)
|
||||
n = get_string_size(data)
|
||||
[tag] = struct.unpack(f'{n}s', data[:n])
|
||||
if "#bundle" in tag.decode():
|
||||
b = Bundle(data)
|
||||
print("Bundle:")
|
||||
for msg in b.msgs:
|
||||
print(" -", msg)
|
||||
else:
|
||||
msg = Message(data)
|
||||
if msg.address == "/time":
|
||||
msg.args[0] = float(msg.args[0])
|
||||
print(msg)
|
||||
|
||||
|
||||
7
examples/test.py
Normal file
7
examples/test.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from oscpy import *
|
||||
|
||||
|
||||
encoded = Message("/", ["test", 0.1, 9]).encode()
|
||||
new_msg = Message().decode(encoded).encode()
|
||||
assert(encoded == new_msg)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
429
workspace/src/osc_ros2/osc_ros2/interactive_input.py
Normal file
429
workspace/src/osc_ros2/osc_ros2/interactive_input.py
Normal file
@@ -0,0 +1,429 @@
|
||||
import os
|
||||
import time
|
||||
import toml
|
||||
import socket
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Any
|
||||
|
||||
import rclpy
|
||||
import roboticstoolbox as rtb
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import JointState
|
||||
|
||||
|
||||
class JointNameListener(Node):
|
||||
joint_names: list[str]|None
|
||||
def __init__(self):
|
||||
super().__init__('joint_name_listener')
|
||||
self.subscription = self.create_subscription(
|
||||
JointState,
|
||||
'/joint_states',
|
||||
self.joint_state_callback,
|
||||
1
|
||||
)
|
||||
self.joint_names = None
|
||||
|
||||
def joint_state_callback(self, msg: JointState):
|
||||
self.joint_names = list(msg.name)
|
||||
|
||||
def interactive_input() -> tuple[dict[str, Any], rtb.robot]:
|
||||
config = {}
|
||||
rconf = {}
|
||||
nconf = {}
|
||||
robot = None
|
||||
while True:
|
||||
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
|
||||
if use_urdf == 'y':
|
||||
while True:
|
||||
rconf["urdf"] = input("Enter the absolute path to the URDF file: ")
|
||||
if os.path.isfile(rconf["urdf"]):
|
||||
if not rconf["urdf"].endswith('.urdf'):
|
||||
print("The file is not a URDF file. Please enter a valid URDF file.")
|
||||
continue
|
||||
break
|
||||
else:
|
||||
print("Invalid path. Please enter a valid path to the URDF file.")
|
||||
tree = ET.parse(rconf["urdf"])
|
||||
root = tree.getroot()
|
||||
robot = rtb.ERobot.URDF(rconf["urdf"])
|
||||
rconf["joint_names"] = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||
print(robot)
|
||||
rconf["joint_velocity_limits"] = {}
|
||||
# Iterate over all joints in the URDF
|
||||
for joint in root.findall('.//joint'):
|
||||
joint_name = joint.get('name') # Get the name of the joint
|
||||
|
||||
# Look for the <limit> tag under each joint
|
||||
limit = joint.find('limit')
|
||||
|
||||
if limit is not None:
|
||||
# Extract the velocity limit (if it exists)
|
||||
velocity_limit = limit.get('velocity')
|
||||
|
||||
if velocity_limit is not None:
|
||||
rconf["joint_velocity_limits"][joint_name] = float(velocity_limit)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('-+'*50)
|
||||
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
|
||||
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
|
||||
cost_mask_string = input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")
|
||||
rconf["cost_mask"] = [int(i) for i in cost_mask_string]
|
||||
if sum(rconf["cost_mask"]) <= robot.n and len(rconf["cost_mask"]) == 6:
|
||||
break
|
||||
else:
|
||||
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter integers only.")
|
||||
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if rconf['cost_mask'][i]==1]}")
|
||||
break
|
||||
elif use_urdf == 'n':
|
||||
node = JointNameListener()
|
||||
print("Wainting 10 sec for JointState messages to extract joint names...")
|
||||
rclpy.spin_once(node)
|
||||
counter = 0
|
||||
while not(node.joint_names):
|
||||
if counter > 100:
|
||||
rconf["joint_names"] = None
|
||||
break
|
||||
counter+=1
|
||||
time.sleep(0.1)
|
||||
rclpy.spin_once(node)
|
||||
rconf["joint_names"] = node.joint_names
|
||||
node.destroy_node()
|
||||
if rconf["joint_names"]:
|
||||
correct_names = input("The following joint names were found:\n" + "\n".join(rconf["joint_names"]) + "\nAre these correct? (y/n): ").strip().lower()
|
||||
while True:
|
||||
if correct_names == 'y':
|
||||
break
|
||||
elif correct_names == 'n':
|
||||
rconf["joint_names"] = None
|
||||
break
|
||||
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
|
||||
if not(rconf["joint_names"]):
|
||||
print("Please enter the joint names manually.")
|
||||
while True:
|
||||
rconf["joint_names"] = []
|
||||
print('-+'*50)
|
||||
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
|
||||
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
|
||||
while True:
|
||||
print('-'*50)
|
||||
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
|
||||
if joint_name.lower() == 'done':
|
||||
break
|
||||
if joint_name:
|
||||
rconf["joint_names"].append(joint_name)
|
||||
print('-+'*50)
|
||||
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {rconf['joint_names']}. (y/n)?: ").strip()
|
||||
if correct_names.lower() == 'y':
|
||||
break
|
||||
else:
|
||||
print("Please re-enter the joint names.")
|
||||
while True:
|
||||
try:
|
||||
rconf["joint_velocity_limits"] = {}
|
||||
for name in rconf["joint_names"]:
|
||||
while True:
|
||||
try:
|
||||
print('--'*50)
|
||||
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
|
||||
if limit == "":
|
||||
continue
|
||||
else:
|
||||
rconf["joint_velocity_limits"][name] = float(limit)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a numeric value or press Enter to skip.")
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to skip.")
|
||||
robot = None
|
||||
rconf["cost_mask"] = None
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if robot:
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
rconf["x_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["y_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["z_limits"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(rconf["x_limits"]) != 2 or len(rconf["y_limits"]) != 2 or len(rconf["z_limits"]) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (rconf["x_limits"][0] is not None and rconf["x_limits"][1] is not None and rconf["x_limits"][0] >= rconf["x_limits"][1]) or \
|
||||
(rconf["y_limits"][0] is not None and rconf["y_limits"][1] is not None and rconf["y_limits"][0] >= rconf["y_limits"][1]) or \
|
||||
(rconf["z_limits"][0] is not None and rconf["z_limits"][1] is not None and rconf["z_limits"][0] >= rconf["z_limits"][1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print("Current limits:")
|
||||
print(f"x: {rconf['x_limits']}")
|
||||
print(f"y: {rconf['y_limits']}")
|
||||
print(f"z: {rconf['z_limits']}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
|
||||
if confirm == 'y':
|
||||
break
|
||||
elif confirm == 'n':
|
||||
print("Please re-enter the limits.")
|
||||
con = False
|
||||
else:
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if con:
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
rconf["x_limits"] = [None, None]
|
||||
rconf["y_limits"] = [None, None]
|
||||
rconf["z_limits"] = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
rconf["x_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["y_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
rconf["z_limits_workspace"] = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(rconf["x_limits_workspace"]) != 2 or len(rconf["y_limits_workspace"]) != 2 or len(rconf["z_limits_workspace"]) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (rconf["x_limits_workspace"][0] is not None and rconf["x_limits_workspace"][1] is not None and rconf["x_limits_workspace"][0] >= rconf["x_limits_workspace"][1]) or \
|
||||
(rconf["y_limits_workspace"][0] is not None and rconf["y_limits_workspace"][1] is not None and rconf["y_limits_workspace"][0] >= rconf["y_limits_workspace"][1]) or \
|
||||
(rconf["z_limits_workspace"][0] is not None and rconf["z_limits_workspace"][1] is not None and rconf["z_limits_workspace"][0] >= rconf["z_limits_workspace"][1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print("Current limits:")
|
||||
print(f"x: {rconf['x_limits_workspace']}")
|
||||
print(f"y: {rconf['y_limits_workspace']}")
|
||||
print(f"z: {rconf['z_limits_workspace']}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
||||
if confirm == 'y':
|
||||
break
|
||||
elif confirm == 'n':
|
||||
print("Please re-enter the limits.")
|
||||
con = False
|
||||
else:
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if con:
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
rconf["x_limits_workspace"] = [None, None]
|
||||
rconf["y_limits_workspace"] = [None, None]
|
||||
rconf["z_limits_workspace"] = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
# Ask the user if they want to set new joint limits
|
||||
while True:
|
||||
print('+-'*50)
|
||||
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
||||
if update_limits == 'y':
|
||||
for i in range(len(rconf["joint_names"])):
|
||||
while True:
|
||||
try:
|
||||
lim = robot.qlim.copy()
|
||||
# Find the link corresponding to the joint name
|
||||
print("-" * 50)
|
||||
print(f"Current position limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{rconf['joint_names'][i]}' (or press Enter to keep current): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print("Invalid input. Lower limit must be less than upper limit.")
|
||||
continue
|
||||
lower_limit = float(lower_limit) if lower_limit!="" else None
|
||||
upper_limit = float(upper_limit) if upper_limit!="" else None
|
||||
if lower_limit is not None:
|
||||
if lower_limit<lim[0][i]:
|
||||
while True:
|
||||
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
|
||||
if sure == 'y':
|
||||
lim[0][i] = float(lower_limit)
|
||||
break
|
||||
elif sure == 'n':
|
||||
print("Lower limit not changed.")
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
else:
|
||||
lim[0][i] = float(lower_limit)
|
||||
if upper_limit is not None:
|
||||
if upper_limit>lim[1][i]:
|
||||
while True:
|
||||
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
|
||||
if sure == 'y':
|
||||
lim[1][i] = float(upper_limit)
|
||||
break
|
||||
elif sure == 'n':
|
||||
print("Upper limit not changed.")
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
else:
|
||||
lim[1][i] = float(upper_limit)
|
||||
robot.qlim = lim
|
||||
print(f"New limits for joint '{rconf['joint_names'][i]}': [{robot.qlim[0][i]} {robot.qlim[1][i]}] rad")
|
||||
print("-" * 50)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
break
|
||||
if update_limits == 'n':
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
'''
|
||||
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
||||
if use_link_mask == 'y':
|
||||
while True:
|
||||
try:
|
||||
'''
|
||||
else:
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
|
||||
n_joints = len(rconf["joint_names"])
|
||||
if update_limits == 'y':
|
||||
rconf["joint_lim"] = [[]*n_joints,[]*n_joints]
|
||||
for i, joint in enumerate(rconf["joint_names"]):
|
||||
while True:
|
||||
try:
|
||||
print("-" * 50)
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print('--' * 50)
|
||||
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
|
||||
continue
|
||||
rconf["joint_lim"][0][i] = float(lower_limit) if lower_limit!="" else None
|
||||
rconf["joint_lim"][1][i] = float(upper_limit) if upper_limit!="" else None
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
print(f'New limits for joint:\n lower: {rconf["joint_lim"][0]}\n upper: {rconf["joint_lim"][1]}')
|
||||
break
|
||||
elif update_limits == 'n':
|
||||
rconf["joint_lim"] = None
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
rconf["trajectory_topic_name"] = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
||||
if rconf["trajectory_topic_name"] == "":
|
||||
rconf["trajectory_topic_name"] = '/scaled_joint_trajectory_controller/joint_trajectory'
|
||||
break
|
||||
elif rconf["trajectory_topic_name"].startswith("/"):
|
||||
break
|
||||
else:
|
||||
print("Invalid topic name. A valid topic name should start with '/'.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
print('--' * 50)
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
rconf["hz"] = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
||||
if rconf["hz"] == "":
|
||||
rconf["hz"] = 100
|
||||
else:
|
||||
rconf["hz"] = float(rconf["hz"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid number.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["log_ip"] = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if nconf["log_ip"] == "":
|
||||
nconf["log_ip"] = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
nconf["log_port"] = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
||||
if nconf["log_port"] == "":
|
||||
nconf["log_port"] = 5005
|
||||
else:
|
||||
nconf["log_port"] = int(nconf["log_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["state_ip"] = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if nconf["state_ip"] == "":
|
||||
nconf["state_ip"] = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
nconf["state_port"] = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
||||
if nconf["state_port"] == "":
|
||||
nconf["state_port"] = 7000
|
||||
else:
|
||||
nconf["state_port"] = int(nconf["state_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
nconf["commands_port"] = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
||||
if nconf["commands_port"] == nconf["state_port"]:
|
||||
print("The commands port must be different from the state port.")
|
||||
continue
|
||||
if nconf["commands_port"] == "":
|
||||
nconf["commands_port"] = 8000
|
||||
else:
|
||||
nconf["commands_port"] = int(nconf["commands_port"])
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid port.")
|
||||
continue
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Sending joint state's' to {nconf['state_ip']}:{nconf['state_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Sending log messages to {nconf['log_ip']}:{nconf['log_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f"Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{nconf['commands_port']}")
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
config["robot"] = rconf
|
||||
config["network"] = nconf
|
||||
while True:
|
||||
save_config = input("Do you want to save this interface? (y/n): ").strip().lower()
|
||||
if save_config == "y":
|
||||
config_path = input("Where do you want to save the config?: ").strip()
|
||||
with open(f"{config_path}.toml", "w") as f:
|
||||
toml.dump(config, f)
|
||||
break
|
||||
elif save_config == "n":
|
||||
print("Will not be saving config")
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
return config, robot
|
||||
@@ -1,60 +1,59 @@
|
||||
#ruff: noqa: F403, F405
|
||||
import rclpy
|
||||
import argparse
|
||||
import toml
|
||||
from rclpy.node import Node
|
||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||
from sensor_msgs.msg import JointState
|
||||
from rcl_interfaces.msg import Log
|
||||
from osc4py3.as_allthreads import *
|
||||
from osc4py3 import oscmethod as osm
|
||||
import xml.etree.ElementTree as ET
|
||||
import numpy as np
|
||||
import spatialmath as sm
|
||||
import roboticstoolbox as rtb
|
||||
from osc4py3 import oscbuildparse
|
||||
from osc_ros2.interactive_input import interactive_input
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
|
||||
class JointNameListener(Node):
|
||||
def __init__(self):
|
||||
super().__init__('joint_name_listener')
|
||||
self.subscription = self.create_subscription(
|
||||
JointState,
|
||||
'/joint_states',
|
||||
self.joint_state_callback,
|
||||
1
|
||||
)
|
||||
self.joint_names = None
|
||||
|
||||
def joint_state_callback(self, msg: JointState):
|
||||
self.joint_names = list(msg.name)
|
||||
|
||||
class OSC_ROS2_interface(Node):
|
||||
"""Node to publish joint trajectories based on OSC messages."""
|
||||
|
||||
def __init__(self, joint_names, joint_velocity_limits, robot, cost_mask):
|
||||
def __init__(self, robot, config):
|
||||
super().__init__('scaled_joint_trajectory_publisher')
|
||||
|
||||
|
||||
self.subscription = self.create_subscription(
|
||||
JointState,
|
||||
'/joint_states',
|
||||
self.joint_states_callback,
|
||||
1
|
||||
)
|
||||
|
||||
self.subscription = self.create_subscription(
|
||||
Log,
|
||||
'/rosout',
|
||||
self.log_callback,
|
||||
100
|
||||
)
|
||||
|
||||
# Store received joint positions
|
||||
self.current_joint_positions = None
|
||||
self.joint_names = joint_names
|
||||
self.joint_velocity_limits = joint_velocity_limits
|
||||
self.cost_mask = cost_mask
|
||||
self.joint_names = config["robot"]["joint_names"]
|
||||
self.joint_velocity_limits = config["robot"]["joint_velocity_limits"]
|
||||
self.cost_mask = config["robot"]["cost_mask"]
|
||||
self.trajectory_topic_name = config["robot"]["trajectory_topic_name"]
|
||||
self.hz = config["robot"]["hz"]
|
||||
self.state_ip = config["network"]["state_ip"]
|
||||
self.state_port = config["network"]["state_port"]
|
||||
self.log_ip = config["network"]["log_ip"]
|
||||
self.log_port = config["network"]["log_port"]
|
||||
self.commands_port = config["network"]["commands_port"]
|
||||
float_or_none = lambda v: None if "None" in v else float(v)
|
||||
self.x_limits = list(map(float_or_none, config["robot"]["x_limits"]))
|
||||
self.y_limits = list(map(float_or_none, config["robot"]["y_limits"]))
|
||||
self.z_limits = list(map(float_or_none, config["robot"]["z_limits"]))
|
||||
self.x_limits_workspace = list(map(float_or_none, config["robot"]["x_limits_workspace"]))
|
||||
self.y_limits_workspace = list(map(float_or_none, config["robot"]["y_limits_workspace"]))
|
||||
self.z_limits_workspace = list(map(float_or_none, config["robot"]["z_limits_workspace"]))
|
||||
self.robot = robot
|
||||
self.desired = None
|
||||
self.previous_desired = None
|
||||
@@ -67,269 +66,17 @@ class OSC_ROS2_interface(Node):
|
||||
}
|
||||
self.speed_scaling = 0.2
|
||||
self.new = False
|
||||
self.n_joints = len(joint_names)
|
||||
|
||||
if robot:
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
set_limits = input("Do you want to set limits for x, y and z? (y/n): ").strip().lower()
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
self.x_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.y_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.z_limits = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(self.x_limits) != 2 or len(self.y_limits) != 2 or len(self.z_limits) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (self.x_limits[0] is not None and self.x_limits[1] is not None and self.x_limits[0] >= self.x_limits[1]) or \
|
||||
(self.y_limits[0] is not None and self.y_limits[1] is not None and self.y_limits[0] >= self.y_limits[1]) or \
|
||||
(self.z_limits[0] is not None and self.z_limits[1] is not None and self.z_limits[0] >= self.z_limits[1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print(f"Current limits:")
|
||||
print(f"x: {self.x_limits}")
|
||||
print(f"y: {self.y_limits}")
|
||||
print(f"z: {self.z_limits}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your TCP to move in this range? (y/n): ").strip().lower()
|
||||
if confirm == 'y':
|
||||
break
|
||||
elif confirm == 'n':
|
||||
print("Please re-enter the limits.")
|
||||
con = False
|
||||
else:
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if con: break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
self.x_limits = [None, None]
|
||||
self.y_limits = [None, None]
|
||||
self.z_limits = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
set_limits = input("Do you want to set workspace limits in x, y and z direction? (y/n): ").strip().lower()
|
||||
if set_limits == 'y':
|
||||
while True:
|
||||
try:
|
||||
self.x_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for x (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.y_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for y (space-separated, enter 'x' for no limit): ").split()]
|
||||
self.z_limits_workspace = [float(i) if i != 'x' else None for i in input("Enter the lower and upper limits for z (space-separated, enter 'x' for no limit): ").split()]
|
||||
|
||||
if len(self.x_limits_workspace) != 2 or len(self.y_limits_workspace) != 2 or len(self.z_limits_workspace) != 2:
|
||||
print("Invalid input. Please enter exactly two values (or leave blank) for each limit.")
|
||||
continue
|
||||
|
||||
if (self.x_limits_workspace[0] is not None and self.x_limits_workspace[1] is not None and self.x_limits_workspace[0] >= self.x_limits_workspace[1]) or \
|
||||
(self.y_limits_workspace[0] is not None and self.y_limits_workspace[1] is not None and self.y_limits_workspace[0] >= self.y_limits_workspace[1]) or \
|
||||
(self.z_limits_workspace[0] is not None and self.z_limits_workspace[1] is not None and self.z_limits_workspace[0] >= self.z_limits_workspace[1]):
|
||||
print("Invalid input. Lower limit must be less than upper limit for each axis.")
|
||||
continue
|
||||
|
||||
print(f"Current limits:")
|
||||
print(f"x: {self.x_limits_workspace}")
|
||||
print(f"y: {self.y_limits_workspace}")
|
||||
print(f"z: {self.z_limits_workspace}")
|
||||
con = True
|
||||
while con:
|
||||
confirm = input("Do you want your robot to move in this range? (y/n): ").strip().lower()
|
||||
if confirm == 'y':
|
||||
break
|
||||
elif confirm == 'n':
|
||||
print("Please re-enter the limits.")
|
||||
con = False
|
||||
else:
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
if con: break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values only.")
|
||||
break
|
||||
elif set_limits == 'n':
|
||||
self.x_limits_workspace = [None, None]
|
||||
self.y_limits_workspace = [None, None]
|
||||
self.z_limits_workspace = [None, None]
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
# Ask the user if they want to set new joint limits
|
||||
|
||||
# Ask the user if they want to set new joint limits
|
||||
while True:
|
||||
print('+-'*50)
|
||||
update_limits = input("Do you want to set new joint limits? (y/n): ").strip().lower()
|
||||
if update_limits == 'y':
|
||||
for i in range(len(self.joint_names)):
|
||||
while True:
|
||||
try:
|
||||
lim = self.robot.qlim.copy()
|
||||
# Find the link corresponding to the joint name
|
||||
print("-" * 50)
|
||||
print(f"Current position limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{self.joint_names[i]}' (or press Enter to keep current): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print("Invalid input. Lower limit must be less than upper limit.")
|
||||
continue
|
||||
lower_limit = float(lower_limit) if lower_limit!="" else None
|
||||
upper_limit = float(upper_limit) if upper_limit!="" else None
|
||||
if lower_limit!=None:
|
||||
if lower_limit<lim[0][i]:
|
||||
while True:
|
||||
sure = input(f"Are you sure you want to set the lower limit to {lower_limit} rad which is less than the default limit {lim[0][i]} (y/n)?: ").strip().lower()
|
||||
if sure == 'y':
|
||||
lim[0][i] = float(lower_limit)
|
||||
break
|
||||
elif sure == 'n':
|
||||
print("Lower limit not changed.")
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
else: lim[0][i] = float(lower_limit)
|
||||
if upper_limit!=None:
|
||||
if upper_limit>lim[1][i]:
|
||||
while True:
|
||||
sure = input(f"Are you sure you want to set the upper limit to {upper_limit} rad which is more than the default limit {lim[1][i]} (y/n)?: ").strip().lower()
|
||||
if sure == 'y':
|
||||
lim[1][i] = float(upper_limit)
|
||||
break
|
||||
elif sure == 'n':
|
||||
print("Upper limit not changed.")
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
else: lim[1][i] = float(upper_limit)
|
||||
self.robot.qlim = lim
|
||||
print(f"New limits for joint '{self.joint_names[i]}': [{self.robot.qlim[0][i]} {self.robot.qlim[1][i]}] rad")
|
||||
print("-" * 50)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
break
|
||||
if update_limits == 'n':
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
'''
|
||||
use_link_mask = input("Do you want to use a link mask? (y/n): ").strip().lower()
|
||||
if use_link_mask == 'y':
|
||||
while True:
|
||||
try:
|
||||
'''
|
||||
else:
|
||||
while True:
|
||||
print('+-' * 50)
|
||||
update_limits = input("Do you want to set joint limits? (y/n): ").strip().lower()
|
||||
if update_limits == 'y':
|
||||
self.joint_lim = [[None]*self.n_joints,[None]*self.n_joints]
|
||||
for i, joint in enumerate(self.joint_names):
|
||||
while True:
|
||||
try:
|
||||
print("-" * 50)
|
||||
lower_limit = input(f"Enter the new lower limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||
upper_limit = input(f"Enter the new upper limit for joint '{joint}' (or press Enter for None): ").strip()
|
||||
|
||||
if lower_limit and upper_limit and float(lower_limit) >= float(upper_limit):
|
||||
print('--' * 50)
|
||||
print(" !!! Invalid input. Lower limit must be less than upper limit. !!! ")
|
||||
continue
|
||||
self.joint_lim[0][i] = float(lower_limit) if lower_limit!="" else None
|
||||
self.joint_lim[1][i] = float(upper_limit) if upper_limit!="" else None
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to keep current limits.")
|
||||
print(f'New limits for joint:\n lower: {self.joint_lim[0]}\n upper: {self.joint_lim[1]}')
|
||||
break
|
||||
elif update_limits == 'n':
|
||||
self.joint_lim = None
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
self.trajectory_topic_name = input("Enter the topic name to which the joint trajectory should be sent (press Enter for default: '/scaled_joint_trajectory_controller/joint_trajectory'): ").strip()
|
||||
if self.trajectory_topic_name == "":
|
||||
self.trajectory_topic_name = '/scaled_joint_trajectory_controller/joint_trajectory'
|
||||
break
|
||||
elif self.trajectory_topic_name.startswith("/"):
|
||||
break
|
||||
else:
|
||||
print("Invalid topic name. A valid topic name should start with '/'.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
self.n_joints = len(self.joint_names)
|
||||
# ROS2 Publisher
|
||||
self.publisher = self.create_publisher(
|
||||
JointTrajectory,
|
||||
self.trajectory_topic_name,
|
||||
1
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
log_ip = str(input("Enter the target IP on which you want to recieve the log as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if log_ip == "":
|
||||
log_ip = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
log_port = input("Enter the target port for the log messages (or press Enter for default: 5005): ")
|
||||
if log_port == "":
|
||||
log_port = 5005
|
||||
else:
|
||||
log_port = int(log_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
state_ip = str(input("Enter the target IP on which you want to recieve the joint states as OSC Messages (or press Enter for default: '127.0.0.1'): "))
|
||||
if state_ip == "":
|
||||
state_ip = "127.0.0.1"
|
||||
print('--' * 50)
|
||||
state_port = input("Enter the target port for the joint state messages (or press Enter for default: 7000): ")
|
||||
if state_port == "":
|
||||
state_port = 7000
|
||||
else:
|
||||
state_port = int(state_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid IP address.")
|
||||
continue
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
commands_port = input("Enter the port you want to send your commands to (or press Enter for default: 8000): ")
|
||||
if commands_port == state_port:
|
||||
print("The commands port must be different from the state port.")
|
||||
continue
|
||||
if commands_port == "":
|
||||
commands_port = 8000
|
||||
else:
|
||||
commands_port = int(commands_port)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid port.")
|
||||
continue
|
||||
|
||||
|
||||
osc_startup()
|
||||
|
||||
osc_udp_client(state_ip, state_port, "osc_client")
|
||||
|
||||
osc_udp_client(log_ip, log_port, "osc_log_client")
|
||||
|
||||
osc_udp_server('0.0.0.0', commands_port, "osc_server")
|
||||
|
||||
osc_udp_client(self.state_ip, self.state_port, "osc_client")
|
||||
osc_udp_client(self.log_ip, self.log_port, "osc_log_client")
|
||||
osc_udp_server('0.0.0.0', self.commands_port, "osc_server")
|
||||
# Register OSC handler
|
||||
osc_method("/joint_positions", self.joint_positions_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/joint_position/*", self.joint_position_handler, argscheme=osm.OSCARG_ADDRESS+osm.OSCARG_DATAUNPACK)
|
||||
@@ -337,39 +84,10 @@ class OSC_ROS2_interface(Node):
|
||||
osc_method("/joint_trajectory", self.joint_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/cartesian_trajectory", self.cartesian_trajectory_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
osc_method("/speed_scaling", self.speed_scaling_handler, argscheme=osm.OSCARG_DATAUNPACK)
|
||||
print('--' * 50)
|
||||
while True:
|
||||
try:
|
||||
print('+-' * 50)
|
||||
self.hz = input("Enter the desired refresh frequency (Hz) (or press Enter for default: 100): ")
|
||||
if self.hz == "":
|
||||
self.hz = 100
|
||||
else:
|
||||
self.hz = float(self.hz)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a valid number.")
|
||||
continue
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Sending joint states to {state_ip}:{state_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Sending log messages to {log_ip}:{log_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
print(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
||||
print()
|
||||
print('=-=' * 50)
|
||||
print()
|
||||
|
||||
self.get_logger().info(f"Publishing joint trajectory to {self.trajectory_topic_name}")
|
||||
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{commands_port}')
|
||||
self.get_logger().info(f'Sending joint states to {state_ip}:{state_port}')
|
||||
self.get_logger().info(f'Sending log messages to {log_ip}:{log_port}')
|
||||
self.get_logger().info(f'Ready to receive OSC messages on {socket.gethostbyname(socket.gethostname())}:{self.commands_port}')
|
||||
self.get_logger().info(f'Sending joint states to {self.state_ip}:{self.state_port}')
|
||||
self.get_logger().info(f'Sending log messages to {self.log_ip}:{self.log_port}')
|
||||
self.create_timer(1/self.hz, self.update_position) # Timer to update the position
|
||||
self.create_timer(3, self.reset_prev) # reset the previous desired position
|
||||
|
||||
@@ -393,17 +111,16 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
def joint_trajectory_handler(self, *args):
|
||||
try:
|
||||
if len(args[0]) == 6:
|
||||
points = [[float(j) for j in i] for i in args]
|
||||
elif len(args[0]) >= 7:
|
||||
points = [[float(j) for j in i[:6]] for i in args]
|
||||
self.get_logger().warn(f"joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.")
|
||||
else:
|
||||
self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.")
|
||||
return
|
||||
|
||||
self.desired = ["joint_trajectory"] + points
|
||||
self.new = True
|
||||
if len(args[0]) == 6:
|
||||
points = [[float(j) for j in i] for i in args]
|
||||
elif len(args[0]) >= 7:
|
||||
points = [[float(j) for j in i[:6]] for i in args]
|
||||
self.get_logger().warn("joint_trajectory_handler: Duration is not supported for joint trajectory yet. Ignoring duration.")
|
||||
else:
|
||||
self.get_logger().warn(f"joint_trajectory_handler: Invalid number of arguments for joint trajectory. Expected {self.n_joints} ([q0, q1, q2, ..., q{self.n_joints}]) or {self.n_joints+1} ([q0, q1, q2, ..., q{self.n_joints}, duration]), but got {len(args[0])}.")
|
||||
return
|
||||
self.desired = ["joint_trajectory"] + points
|
||||
self.new = True
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"joint_trajectory_handler: {e}")
|
||||
|
||||
@@ -411,7 +128,6 @@ class OSC_ROS2_interface(Node):
|
||||
"""Handles incoming OSC messages for joint positions."""
|
||||
try:
|
||||
joint_name = address.split("/")[-1]
|
||||
|
||||
if joint_name in self.joint_names:
|
||||
if len(args) == 1:
|
||||
position = float(args[0])
|
||||
@@ -479,7 +195,6 @@ class OSC_ROS2_interface(Node):
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"joint_position_handler: {e}")
|
||||
|
||||
|
||||
def cartesian_trajectory_handler(self, *args):
|
||||
"""Handles incoming OSC messages for cartesian trajectory."""
|
||||
if self.robot:
|
||||
@@ -488,11 +203,10 @@ class OSC_ROS2_interface(Node):
|
||||
points = [[float(j) for j in i] for i in args]
|
||||
elif len(args[0]) >= 7:
|
||||
points = [[float(j) for j in i[:6]] for i in args]
|
||||
self.get_logger().warn(f"cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.")
|
||||
self.get_logger().warn("cartesian_trajectory_handler: Duration is not supported for cartesian trajectory yet. Ignoring duration.")
|
||||
else:
|
||||
self.get_logger().warn(f"cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.")
|
||||
self.get_logger().warn("cartesian_trajectory_handler: Invalid number of arguments for cartesian trajectory. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args[0])}.")
|
||||
return
|
||||
|
||||
self.desired = ["cartesian_trajectory"] + points
|
||||
self.new = True
|
||||
except Exception as e:
|
||||
@@ -511,7 +225,6 @@ class OSC_ROS2_interface(Node):
|
||||
else:
|
||||
self.get_logger().warn(f"joint_positions_handler: Invalid number of arguments for joint positions. Expected {len(self.joint_names)} ([q0, q1, q2, ... q{len(self.joint_names)}]) or {len(self.joint_names)+1} ([q0, q1, q2, ... q{len(self.joint_names)}, duration]), but got {len(args)}.")
|
||||
return
|
||||
|
||||
# Check if joint positions exceed limits
|
||||
if self.robot:
|
||||
for i, position in enumerate(desired_joint_positions[:len(self.joint_names)]): # Exclude duration if present
|
||||
@@ -540,7 +253,6 @@ class OSC_ROS2_interface(Node):
|
||||
self.get_logger().warn(
|
||||
f"joint_positions_handler: Joint '{self.joint_names[i]}' position {position} is out of bounds. Using {self.joint_lim[1][i]}."
|
||||
)
|
||||
|
||||
self.desired = ["joint_positions"] + desired_joint_positions
|
||||
self.new = True
|
||||
except Exception as e:
|
||||
@@ -559,7 +271,6 @@ class OSC_ROS2_interface(Node):
|
||||
else:
|
||||
self.get_logger().warn(f"tcp_coordinates_handler: Invalid number of arguments for TCP coordinates. Expected 6 ([x, y, z, roll, pitch, yaw]) or 7 ([x, y, z, roll, pitch, yaw, duration]), but got {len(args)}.")
|
||||
return
|
||||
|
||||
if self.x_limits[0] is not None:
|
||||
x = max(self.x_limits[0], x)
|
||||
if self.x_limits[1] is not None:
|
||||
@@ -572,13 +283,11 @@ class OSC_ROS2_interface(Node):
|
||||
z = max(self.z_limits[0], z)
|
||||
if self.z_limits[1] is not None:
|
||||
z = min(self.z_limits[1], z)
|
||||
|
||||
if x != args[0] or y != args[1] or z != args[2]:
|
||||
self.get_logger().warn(
|
||||
f"tcp_coordinates_handler: Desired joint positions adjusted to fit within limits: "
|
||||
f"x={x}, y={y}, z={z} (original: x={args[0]}, y={args[1]}, z={args[2]})"
|
||||
)
|
||||
|
||||
self.desired = ["tcp_coordinates", x, y, z, r, p, yaw, duration]
|
||||
self.new = True
|
||||
except Exception as e:
|
||||
@@ -587,23 +296,21 @@ class OSC_ROS2_interface(Node):
|
||||
self.get_logger().warn("tcp_coordinates_handler: No robot model provided. Cannot handle TCP coordinates.")
|
||||
return
|
||||
|
||||
|
||||
def joint_states_callback(self, msg: JointState):
|
||||
"""Callback function to handle incoming joint states."""
|
||||
try:
|
||||
msg_time = oscbuildparse.OSCMessage(f"/time", ',s', [str(time.time())])
|
||||
msg_time = oscbuildparse.OSCMessage("/time", ',s', [str(time.time())])
|
||||
osc_send(msg_time, "osc_client")
|
||||
if not(self.joint_names): self.joint_names = msg.name
|
||||
if not(self.joint_names):
|
||||
self.joint_names = msg.name
|
||||
joint_position_dict = dict(zip(msg.name, msg.position))
|
||||
self.current_joint_positions = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||
joint_position_dict = dict(zip(msg.name, msg.velocity))
|
||||
self.current_joint_velocities = [float(joint_position_dict[name]) for name in self.joint_names]
|
||||
|
||||
if self.robot:
|
||||
tcp_position = self.robot.fkine(self.current_joint_positions).t
|
||||
tcp_orientation = self.robot.fkine(self.current_joint_positions).rpy()
|
||||
|
||||
msg_tcp = oscbuildparse.OSCMessage(f"/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
|
||||
msg_tcp = oscbuildparse.OSCMessage("/tcp_coordinates", ',ffffff', [tcp_position[0], tcp_position[1], tcp_position[2], tcp_orientation[0], tcp_orientation[1], tcp_orientation[2]])
|
||||
#msg_x = oscbuildparse.OSCMessage(f"/tcp_coordinates/x", ',f', [tcp_position[0]])
|
||||
#msg_y = oscbuildparse.OSCMessage(f"/tcp_coordinates/y", ',f', [tcp_position[1]])
|
||||
#msg_z = oscbuildparse.OSCMessage(f"/tcp_coordinates/z", ',f', [tcp_position[2]])
|
||||
@@ -613,21 +320,18 @@ class OSC_ROS2_interface(Node):
|
||||
#bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_tcp, msg_x, msg_y, msg_z, msg_roll, msg_pitch, msg_yaw])
|
||||
#osc_send(bun, "osc_client")
|
||||
osc_send(msg_tcp, "osc_client")
|
||||
|
||||
msg_position = oscbuildparse.OSCMessage(f"/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
|
||||
msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
|
||||
msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort])
|
||||
msg_name = oscbuildparse.OSCMessage(f"/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name])
|
||||
msg_position = oscbuildparse.OSCMessage("/joint_state/position", f',{"f"*self.n_joints}', [i for i in msg.position])
|
||||
msg_velocity = oscbuildparse.OSCMessage("/joint_state/velocity", f',{"f"*self.n_joints}', [i for i in msg.velocity])
|
||||
msg_effort = oscbuildparse.OSCMessage("/joint_state/effort", f',{"f"*self.n_joints}', [i for i in msg.effort])
|
||||
msg_name = oscbuildparse.OSCMessage("/joint_state/name", f',{"s"*self.n_joints}', [i for i in msg.name])
|
||||
bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_name, msg_position, msg_velocity, msg_effort])
|
||||
osc_send(bun, "osc_client")
|
||||
|
||||
#for i, name in enumerate(msg.name):
|
||||
# msg_position = oscbuildparse.OSCMessage(f"/joint_state/position/{name}", ',f', [msg.position[i]])
|
||||
# msg_velocity = oscbuildparse.OSCMessage(f"/joint_state/velocity/{name}", ',f', [msg.velocity[i]])
|
||||
# msg_effort = oscbuildparse.OSCMessage(f"/joint_state/effort/{name}", ',f', [msg.effort[i]])
|
||||
# bun = oscbuildparse.OSCBundle(oscbuildparse.OSC_IMMEDIATELY, [msg_position, msg_velocity, msg_effort])
|
||||
# osc_send(bun, "osc_client")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f"joint_states_callback: {e}")
|
||||
|
||||
@@ -672,21 +376,17 @@ class OSC_ROS2_interface(Node):
|
||||
return
|
||||
|
||||
def trapezoidal_timestamps(self, num_points,total_duration, flat_ratio = 0.3):
|
||||
|
||||
if num_points == 2:
|
||||
return [0, total_duration]
|
||||
n = int(num_points*(1-flat_ratio)/2)
|
||||
start = np.cos(np.linspace(0, np.pi, n))+2
|
||||
end = np.cos(np.linspace(-np.pi, 0, n))+2
|
||||
flat = np.ones(num_points-2*n)
|
||||
|
||||
timestamps = np.concatenate((start, flat, end))
|
||||
timestamps *= total_duration / timestamps.sum()
|
||||
timestamps = np.cumsum(timestamps)
|
||||
|
||||
return timestamps.tolist()
|
||||
|
||||
|
||||
def send_tcp_coordinates(self):
|
||||
"""Send the desired TCP coordinates to the robot."""
|
||||
try:
|
||||
@@ -694,7 +394,7 @@ class OSC_ROS2_interface(Node):
|
||||
msg = JointTrajectory()
|
||||
msg.joint_names = self.joint_names
|
||||
steps_per_m = 100
|
||||
if self.previous_desired == None:
|
||||
if self.previous_desired is None:
|
||||
[x,y,z] = self.robot.fkine(self.current_joint_positions).t
|
||||
[roll, pitch, yaw] = self.robot.fkine(self.current_joint_positions).rpy()
|
||||
else:
|
||||
@@ -703,7 +403,8 @@ class OSC_ROS2_interface(Node):
|
||||
x1, y1, z1, roll1, pitch1, yaw1 = self.desired[1:7]
|
||||
self.previous_desired = self.desired
|
||||
steps = int(np.linalg.norm(np.array([x1, y1, z1])- self.robot.fkine(self.current_joint_positions).t) * steps_per_m)
|
||||
if steps < 2: steps = 2
|
||||
if steps < 2:
|
||||
steps = 2
|
||||
cart_traj = [sm.SE3([x+(x1-x)/(steps-1)*i, y+(y1-y)/(steps-1)*i, z+(z1-z)/(steps-1)*i]) * sm.SE3.RPY([roll+(roll1-roll)/(steps-1)*i, pitch+(pitch1-pitch)/(steps-1)*i, yaw+(yaw1-yaw)/(steps-1)*i]) for i in range(steps)]
|
||||
'''if self.previous_desired:
|
||||
[x,y,z] = self.previous_desired[1:4]
|
||||
@@ -746,16 +447,13 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
# Compose SE3 transform
|
||||
cart_traj.append(sm.SE3(pos_interp) * q_interp.SE3())'''
|
||||
|
||||
|
||||
|
||||
if self.desired[-1]:
|
||||
timestamps = self.trapezoidal_timestamps(steps, self.desired[-1], 0.8)
|
||||
for j in range(steps):
|
||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||
if sol[1] == 1:
|
||||
fowards = self.robot.fkine_all(sol[0])
|
||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False)
|
||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
|
||||
if np.any(out_of_bounds):
|
||||
#print(fowards.t)
|
||||
#indices = np.where(out_of_bounds)[0]
|
||||
@@ -790,9 +488,6 @@ class OSC_ROS2_interface(Node):
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
self.publisher.publish(msg)
|
||||
self.previous_desired = self.desired
|
||||
|
||||
|
||||
|
||||
else:
|
||||
prev_duration = 0
|
||||
'''
|
||||
@@ -807,7 +502,7 @@ class OSC_ROS2_interface(Node):
|
||||
sol = self.robot.ik_LM(cart_traj[j], q0=self.current_joint_positions, mask = self.cost_mask, joint_limits = True, method = 'chan') if j == 0 else self.robot.ik_LM(cart_traj[j], q0=prev_sol, mask = self.cost_mask, joint_limits = True, method = 'chan')
|
||||
if sol[1] == 1:
|
||||
fowards = self.robot.fkine_all(sol[0])
|
||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] != None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] != None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] != None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] != None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] != None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] != None else False)
|
||||
out_of_bounds = (fowards.t[1:,0] > self.x_limits_workspace[1] if self.x_limits_workspace[1] is not None else False) | (fowards.t[1:,0] < self.x_limits_workspace[0] if self.x_limits_workspace[0] is not None else False) | (fowards.t[1:,1] > self.y_limits_workspace[1] if self.y_limits_workspace[1] is not None else False) | (fowards.t[1:,1] < self.y_limits_workspace[0] if self.y_limits_workspace[0] is not None else False) | (fowards.t[1:,2] > self.z_limits_workspace[1] if self.z_limits_workspace[1] is not None else False) | (fowards.t[1:,2] < self.z_limits_workspace[0] if self.z_limits_workspace[0] is not None else False)
|
||||
if np.any(out_of_bounds):
|
||||
#print(fowards.t)
|
||||
#indices = np.where(out_of_bounds)[0]
|
||||
@@ -852,7 +547,6 @@ class OSC_ROS2_interface(Node):
|
||||
self.get_logger().fatal(f"send_tcp_coordinates: {e}")
|
||||
|
||||
def send_joint_trajectory(self):
|
||||
|
||||
try:
|
||||
self.new = False
|
||||
viapoints = np.array([i for i in self.desired[1:]])
|
||||
@@ -873,7 +567,6 @@ class OSC_ROS2_interface(Node):
|
||||
print(f'Error in joint_angles_handler: {e}')
|
||||
|
||||
def send_cartesian_trajectory(self):
|
||||
|
||||
try:
|
||||
self.new = False
|
||||
viapoints = np.array([i[:6] for i in self.desired[1:]])
|
||||
@@ -898,14 +591,14 @@ class OSC_ROS2_interface(Node):
|
||||
point.time_from_start.nanosec = int((traj.t[i] - int(traj.t[i])) * 1e9)
|
||||
msg.points.append(point)
|
||||
prev_sol = list(sol[0])
|
||||
else: self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!")
|
||||
else:
|
||||
self.get_logger().warn(f"send_cartesian_trajectory: IK could not find a solution for (x,y,z) = {traj.q[i][:3]} and (r,p,y) = {traj.q[i][3:]}!")
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.points = msg.points[::n]
|
||||
self.publisher.publish(msg)
|
||||
self.previous_desired = None
|
||||
except Exception as e:
|
||||
print(f'Error in joint_angles_handler: {e}')
|
||||
|
||||
self.previous_desired = None
|
||||
|
||||
def update_position(self):
|
||||
@@ -913,7 +606,6 @@ class OSC_ROS2_interface(Node):
|
||||
try:
|
||||
if self.desired is None or not(self.new):
|
||||
return
|
||||
|
||||
if self.desired[0] == "joint_positions":
|
||||
self.new = False
|
||||
self.send_joint_positions()
|
||||
@@ -933,35 +625,26 @@ class OSC_ROS2_interface(Node):
|
||||
else:
|
||||
self.get_logger().warn(f"update_position: Unknown desired type '{self.desired[0]}'.")
|
||||
return
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().fatal(f'update_position: {e}')
|
||||
|
||||
|
||||
def clean_log_string(self, s):
|
||||
|
||||
s = str(s)
|
||||
|
||||
# Remove ANSI escape sequences (e.g., \x1b[31m)
|
||||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||||
s = ansi_escape.sub('', s)
|
||||
|
||||
# Replace tabs/newlines with spaces
|
||||
s = s.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ').replace("'", ' '). replace('"', ' ').replace('`', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ').replace('´', ' ').replace('`', ' ').replace('“', ' ').replace('”', ' ').replace('‘', ' ').replace('’', ' ')
|
||||
|
||||
# Strip leading/trailing whitespace
|
||||
s = s.strip()
|
||||
|
||||
# Optionally enforce ASCII only (replace non-ASCII chars with '?')
|
||||
s = s.encode('ascii', 'replace').decode('ascii')
|
||||
|
||||
return s
|
||||
|
||||
|
||||
def log_callback(self, msg: Log):
|
||||
"""Callback function to handle incoming log messages."""
|
||||
|
||||
# Send the log message as an OSC message
|
||||
msg_log = oscbuildparse.OSCMessage(f"/log/{self.log_dict.get(msg.level, 'UNKNOWN')}", ',isss', [int(msg.level), str(msg.stamp.sec+msg.stamp.nanosec*1e-9) , str(msg.name), self.clean_log_string(msg.msg)])
|
||||
osc_send(msg_log, "osc_log_client")
|
||||
@@ -969,122 +652,16 @@ class OSC_ROS2_interface(Node):
|
||||
|
||||
def main():
|
||||
"""Main function to get joint names and start the ROS 2 & OSC system."""
|
||||
parser = argparse.ArgumentParser(description="An interface between ROS2 and OSC for robotic arm manipulators", usage="ros2 run osc_ros2 interface [-c CONFIG.TOML]")
|
||||
parser.add_argument("-c", "--config", type=argparse.FileType("r"), help="A robot TOML config, if the config is provided then the interface will not show the iteractive input")
|
||||
args = parser.parse_args()
|
||||
rclpy.init()
|
||||
while True:
|
||||
use_urdf = input("Do you have a URDF file you want to use? (y/n): ").strip().lower()
|
||||
if use_urdf == 'y':
|
||||
while True:
|
||||
robot_urdf = input("Enter the absolute path to the URDF file: ")
|
||||
if os.path.isfile(robot_urdf):
|
||||
if not robot_urdf.endswith('.urdf'):
|
||||
print("The file is not a URDF file. Please enter a valid URDF file.")
|
||||
continue
|
||||
break
|
||||
else:
|
||||
print("Invalid path. Please enter a valid path to the URDF file.")
|
||||
tree = ET.parse(robot_urdf)
|
||||
root = tree.getroot()
|
||||
robot = rtb.ERobot.URDF(robot_urdf)
|
||||
joint_names = [joint.get('name') for joint in root.findall('joint') if joint.get('type') == 'revolute' or joint.get('type') == 'continuous' or joint.get('type') == 'prismatic']
|
||||
print(robot)
|
||||
joint_velocity_limits = {}
|
||||
|
||||
# Iterate over all joints in the URDF
|
||||
for joint in root.findall('.//joint'):
|
||||
joint_name = joint.get('name') # Get the name of the joint
|
||||
|
||||
# Look for the <limit> tag under each joint
|
||||
limit = joint.find('limit')
|
||||
|
||||
if limit is not None:
|
||||
# Extract the velocity limit (if it exists)
|
||||
velocity_limit = limit.get('velocity')
|
||||
|
||||
if velocity_limit is not None:
|
||||
joint_velocity_limits[joint_name] = float(velocity_limit)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print('-+'*50)
|
||||
print("The cost mask determines which coordinates are used for the IK.\nEach element of the cost mask corresponds to a Cartesian coordinate [x, y, z, roll, pitch, yaw].")
|
||||
print("The cost mask 111000 means that the IK will only consider translation and no rotaion.")
|
||||
cost_mask = [int(i) for i in input(f"Enter the cost mask (6 integers (1 or 0), of which <= {robot.n} are 1): ")]
|
||||
if sum(cost_mask) <= robot.n and len(cost_mask) == 6:
|
||||
break
|
||||
else:
|
||||
print(f"Invalid input. Expected 6 integers of which {robot.n if robot.n < 6 else 6} or less are 1.")
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter integers only.")
|
||||
print(f"The following coordinates will be used for the IK: {[j for i,j in enumerate(['x','y','z','roll','pitch','yaw']) if cost_mask[i]==1]}")
|
||||
break
|
||||
elif use_urdf == 'n':
|
||||
node = JointNameListener()
|
||||
print("Wainting 10 sec for JointState messages to extract joint names...")
|
||||
rclpy.spin_once(node)
|
||||
counter = 0
|
||||
while not(node.joint_names):
|
||||
if counter > 100:
|
||||
joint_names = None
|
||||
break
|
||||
counter+=1
|
||||
time.sleep(0.1)
|
||||
rclpy.spin_once(node)
|
||||
joint_names = node.joint_names
|
||||
node.destroy_node()
|
||||
if joint_names:
|
||||
correct_names = input("The following joint names were found:\n" + "\n".join(joint_names) + "\nAre these correct? (y/n): ").strip().lower()
|
||||
while True:
|
||||
if correct_names == 'y':
|
||||
break
|
||||
elif correct_names == 'n':
|
||||
joint_names = None
|
||||
break
|
||||
correct_names = input("Invalid input. Please enter 'y' or 'n'.")
|
||||
if not(joint_names):
|
||||
print("Please enter the joint names manually.")
|
||||
while True:
|
||||
joint_names = []
|
||||
print('-+'*50)
|
||||
print("Enter the joint names manually one by one. Type 'done' when you are finished:")
|
||||
print("Attention: The order of the joints is important. It should be the same in which you want to send the joint positions.")
|
||||
while True:
|
||||
print('-'*50)
|
||||
joint_name = input("Enter joint name (or 'done' to finish): ").strip()
|
||||
if joint_name.lower() == 'done':
|
||||
break
|
||||
if joint_name:
|
||||
joint_names.append(joint_name)
|
||||
print('-+'*50)
|
||||
correct_names = input(f"Are those the joint names as defined in your JointTrajectroy recieving Node?: {joint_names}. (y/n)?: ").strip()
|
||||
if correct_names.lower() == 'y':
|
||||
break
|
||||
else:
|
||||
print("Please re-enter the joint names.")
|
||||
while True:
|
||||
try:
|
||||
joint_velocity_limits = {}
|
||||
for name in joint_names:
|
||||
while True:
|
||||
try:
|
||||
print('--'*50)
|
||||
limit = input(f"Enter the velocity limit for joint '{name}': ").strip()
|
||||
if limit == "":
|
||||
continue
|
||||
else:
|
||||
joint_velocity_limits[name] = float(limit)
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter a numeric value or press Enter to skip.")
|
||||
break
|
||||
except ValueError:
|
||||
print("Invalid input. Please enter numeric values or leave blank to skip.")
|
||||
robot = None
|
||||
cost_mask = None
|
||||
break
|
||||
print("Invalid input. Please enter 'y' or 'n'.")
|
||||
|
||||
|
||||
node = OSC_ROS2_interface(joint_names, joint_velocity_limits, robot, cost_mask)
|
||||
if not args.config:
|
||||
config, robot = interactive_input()
|
||||
else:
|
||||
config = toml.load(args.config)
|
||||
robot = rtb.ERobot.URDF(config["robot"]["urdf"])
|
||||
node = OSC_ROS2_interface(robot, config)
|
||||
|
||||
# Run ROS 2 spin, and osc_process will be handled by the timer
|
||||
try:
|
||||
|
||||
27
workspace/ur10e.toml
Normal file
27
workspace/ur10e.toml
Normal file
@@ -0,0 +1,27 @@
|
||||
[robot]
|
||||
urdf = "/workspace/ur10e.urdf"
|
||||
joint_names = [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",]
|
||||
cost_mask_string = "111111"
|
||||
x_limits = [ "None", "None",]
|
||||
y_limits = [ "None", "None",]
|
||||
z_limits = [ "None", "None",]
|
||||
x_limits_workspace = [ "None", "None",]
|
||||
y_limits_workspace = [ "None", "None",]
|
||||
z_limits_workspace = [ "None", "None",]
|
||||
trajectory_topic_name = "/scaled_joint_trajectory_controller/joint_trajectory"
|
||||
hz = 100
|
||||
|
||||
[network]
|
||||
log_ip = "127.0.0.1"
|
||||
log_port = 5005
|
||||
state_ip = "127.0.0.1"
|
||||
state_port = 7000
|
||||
commands_port = 8000
|
||||
|
||||
[robot.joint_velocity_limits]
|
||||
shoulder_pan_joint = 2.0943951023931953
|
||||
shoulder_lift_joint = 2.0943951023931953
|
||||
elbow_joint = 3.141592653589793
|
||||
wrist_1_joint = 3.141592653589793
|
||||
wrist_2_joint = 3.141592653589793
|
||||
wrist_3_joint = 3.141592653589793
|
||||
Reference in New Issue
Block a user