Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 61 additions & 108 deletions uwb_localization/nodes/localization_listener_node.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,64 @@
#!/usr/bin/env python3
import os
import sys
import glob

import rospy
import rospkg
import math
import time
import matplotlib
matplotlib.use('Agg') # necessary when plotting without $DISPLAY
from itertools import combinations

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation as R

from hwctrl.msg import UwbData
from uwb_localization.triangulation import UltraWideBandNode

from std_msgs.msg import Header
from geometry_msgs.msg import Quaternion, PoseWithCovarianceStamped, PoseWithCovariance, Pose, Point

from localization.geoInterface import Target, Anchor
import localization.geoProject as Project

from uwb_localization.triangulation import UltraWideBandNode

class UwbLocalizationNode:

node_topic = "localization_data"
viz_dir = "visualizations/"


class LocalizationNode:
def __init__(self, visualize=False):
self.topic = 'localization_data' # topic where UWB distances are published
self.viz_dir = 'visualizations/' # directory to store node visualizations
self.visualize = visualize
self.viz_step = 100
print('Booting up node...')
rospy.init_node('localization_listener', anonymous=True)
self.nodes = [] # list of UltraWideBandNode instances, one for each node and one for each anchor
self.robot_x = [] # list of past and current x positions
self.robot_y = [] # list of past and current y positions
self.robot_theta = [] # list of past and current yaw measurements
self.sensors = None # DataFrame of sensors, types, and relative positions

# setup visualization directory and remove all past visualizations
os.makedirs(self.viz_dir, exist_ok=True)
try:
files = glob.glob('%s/*' % self.viz_dir)
for f in files:
os.remove(f)
except Exception as e:
print(e)

def init_nodes(self):
rp = rospkg.RosPack()
# find config file in canbus node directory
script_path = os.path.join(rp.get_path("canbus"), "include", "node_config.csv")
sensors = pd.read_csv(script_path)
self.sensors = sensors
print(sensors)
return sensors

def get_robot_orientation(self):
non_anchors = [x for x in self.nodes if x.type == 'node'] # get all nodes on the robot
node_pairs = combinations(non_anchors, 2) # get all pairwise combinations of nodes
thetas = [] # list to store bearing angles measured between all pairs
rospy.init_node("localization_listener", anonymous=False)

self.robot_x = []
self.robot_y = []
self.robot_theta = []

self.nodes = {}
self.anchors = []

self.pub = rospy.Publisher("uwb_nodes", PoseWithCovarianceStamped, queue_size=12)
self.sub = rospy.Subscriber("localization_data", UwbData, self.position_callback, None, 12)
# get anchors
for i in range(3):
base_path = f"/hardware/anchors/anchor_{i}/"
id = int(rospy.get_param(base_path + "id"))
x = float(rospy.get_param(base_path + "x"))
y = float(rospy.get_param(base_path + "y"))
self.anchors.append((id, x , y))

# get uwbs
for i in range(4):
base_path = f"/hardware/sensors/uwb_{i}/"
id = int(rospy.get_param(base_path + "id"))
rel_x = int(rospy.get_param(base_path + "relative_x"))
rel_y = int(rospy.get_param(base_path + "relative_x"))

uwb = UltraWideBandNode(id, rel_x, rel_y, anchors)
nodes[id] = uwb


def get_robot_rotation(self):
thetas = []
node_pairs = combinations(self.nodes.values(), 2)
for (start_node, end_node) in node_pairs:
if start_node.is_valid() and end_node.is_valid():
# get vector with tail at start node and head at end end
robot_edge = -np.array([start_node.relative_x, start_node.relative_y]) + np.array([end_node.relative_x, end_node.relative_y])

# each edge vector is at a different angle relative to the robot coordinate system
Expand All @@ -67,25 +67,25 @@ def get_robot_orientation(self):
dX = end_node.x - start_node.x
theta = np.arctan2(dY, dX) - theta_offset
thetas.append([np.cos(theta), np.sin(theta)])

if len(thetas) > 0:
theta = np.mean(thetas, axis=0)
self.robot_theta.append(np.arctan2(theta[1], theta[0]))
return 0 if len(self.robot_theta) == 0 else self.robot_theta[-1]

def position_callback(self, msg):
for node in self.nodes:
if node.id == msg.node_id:
node.add_measurement(msg.anchor_id, msg.distance, 0)
node.get_position()

def data_callback(self, msg):
self.nodes[msg.node_id].add_measurement(msg.anchor_id, msg.distance, 0)
self.nodes[msg.node_id].get_position()

avg_x = 0
avg_y = 0
total = 0
theta = self.get_robot_orientation()
non_anchors = [x for x in self.nodes if x.type == 'node'] # get all nodes on the robot
node_pairs = combinations(non_anchors, 2) # get all pairwise combinations of nodes
theta = self.get_robot_rotation()

node_pairs = combinations(self.nodes.values(), 2)
for (start_node, end_node) in node_pairs:
if start_node.is_valid() and end_node.is_valid():
#print(start_node.id, end_node.id, start_node.relative_x, end_node.relative_x, start_node.relative_y, end_node.relative_y)
# no idea what this line means? opposite corners?
if start_node.relative_x == -end_node.relative_x and start_node.relative_y == -end_node.relative_y:
start_node.robot_x = (start_node.x + end_node.x) * 0.5
end_node.robot_x = start_node.robot_x
Expand All @@ -95,53 +95,13 @@ def position_callback(self, msg):
avg_y += start_node.robot_y
total += 1

#for node in self.nodes:
# if node.is_valid():
# # offset node measurements by their relative positions to the center of the robot
# # must correct for current orientation of robot as well
# phi = math.atan2(node.relative_y, node.relative_x)
# z = math.sqrt(node.relative_x ** 2 + node.relative_y ** 2)
# avg_x += node.x + z * math.cos(theta - phi)
# avg_y += node.y - z * math.sin(theta - phi)
# total += 1

print('Total:', total)
if total > 0:
self.robot_x.append(avg_x / total)
self.robot_y.append(avg_y / total)
print('X: %.3f, Y: %.3f, theta: %.3f' % (self.robot_x[-1], self.robot_y[-1], theta))
self.compose_msg()

if self.visualize and len(self.robot_x) % self.viz_step == 0:
fig = plt.figure(figsize=(6 * 4, 9))

ax = plt.subplot(141)
ax.set_title('Position')
for node in self.nodes:
node.plot_position(theta, ax=ax)

ax.legend(loc='best')
ax.set_xlim(0, 5.2)
ax.set_ylim(0, 6.05)

ax = plt.subplot(142)
ax.set_title('Node Distances')
for node in self.nodes:
ax.plot(node.distance_plot, label=node.id)
ax.set_ylim(0, 6.05)
ax.legend(loc='best')

ax = plt.subplot(143)
ax.scatter(self.robot_x, self.robot_y, label='robot')
ax.arrow(self.robot_x[-1], self.robot_y[-1], .3 * math.cos(theta), .3 * math.sin(theta), head_width=0.1)
ax.legend(loc='best')
ax.set_xlim(0, 5.2)
ax.set_ylim(0, 6.05)

plt.tight_layout()
fig.savefig(self.viz_dir + 'node_1_%d.png' % (len(os.listdir(self.viz_dir))))
plt.close()

def compose_msg(self):
header = Header()
header.stamp = rospy.Time.now()
Expand All @@ -157,20 +117,13 @@ def compose_msg(self):
stamped_msg.header = header
stamped_msg.pose = pose_with_cov
try:
pub = rospy.Publisher('uwb_nodes', PoseWithCovarianceStamped, queue_size=1)
pub.publish(stamped_msg)
self.pub.publish(stamped_msg)
except rospy.ROSInterruptException as e:
print(e.getMessage())
pass


if __name__ == '__main__':
localization_node = LocalizationNode()
sensors = localization_node.init_nodes()
for i, sensor in sensors.iterrows():
if sensor['type'] == 'node':
uwb_node = UltraWideBandNode(sensor['id'], sensor['x'], sensor['y'], sensor['type'], sensors)
localization_node.nodes.append(uwb_node)

sub = rospy.Subscriber(localization_node.topic, UwbData, localization_node.position_callback, queue_size=12)
if __name__ == "__main__":
node = UwbLocalizationNode()
# uwb_sub = rospy.Subscriber(node.node_topic, UwbData, node.data_callback, None, 12)
rospy.spin()
105 changes: 39 additions & 66 deletions uwb_localization/src/uwb_localization/triangulation.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,57 @@
#!/usr/bin/env python3
import time
import math

import localization.geoProject as Project
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import numpy as np
from localization.geoProject import Project


class UltraWideBandNode:
def __init__(self, id, relative_x, relative_y, type, sensors):
self.id = id
self.x = None
self.y = None
self.robot_x = None
self.robot_y = None

def __init__(self, id, relative_x, relative_y, anchors):
# list of anchors, contains the id number, x, and y
self.anchors = anchors
# map of measurements from each anchor
self.measurements = {}

self.relative_x = relative_x
self.relative_y = relative_y
self.type = type
self.confidence = None
self.sensors = sensors
self.measurements = {}
self.x_plot = []
self.y_plot = []
self.distance_plot = []

def is_valid(self):
return self.x is not None and self.y is not None
self.x = None
self.y = None
self.condidence = None

def get_robot_position(self):
sensor = self.sensors[self.sensors['id'] == self.id]
return float(sensor['x']), float(sensor['y'])
self.id = id

def is_valid(self) -> bool:
return self.x is not None and self.y is not None

def add_measurement(self, anchor_id, distance, confidence):
if distance >= 0:
self.measurements[anchor_id] = distance
if int(anchor_id) == 51:
self.distance_plot.append(distance)
self.confidence = confidence
else:
self.measurements[anchor_id] = np.nan # set to invalid value to be ignored

def plot_position(self, theta, ax, moving_average=False):
if moving_average:
if len(self.x_plot) >= 3 and len(self.y_plot) >= 3:
avg_x = self.moving_average(self.x_plot)
avg_y = self.moving_average(self.y_plot)
ax.scatter(avg_x, avg_y, label=str(self.id))
else:
ax.scatter(self.x_plot, self.y_plot, label=str(self.id))
if self.robot_x is not None and self.robot_y is not None:
ax.plot([self.x_plot[-1], self.robot_x], [self.y_plot[-1], self.robot_y], marker='o')

def get_position(self):
if len(list(self.measurements.keys())) >= 3:
P = Project(mode='2D', solver='LSE_GC')
if len(self.measurements.keys()) < 3:
pass
else:
P = Project(mode="2D", solver="LSE")
for id, x, y in self.anchors:
P.add_anchor(id, (x, y))

for i, sensor in self.sensors.iterrows():
if sensor['type'] == 'anchor':
P.add_anchor(sensor['id'], (sensor['x'], sensor['y']))
t, lbl = P.add_target()

t,label=P.add_target()
for anchor_id in self.measurements.keys():
t.add_measurement(anchor_id, self.measurements[anchor_id])

for sensor in self.measurements.keys():
t.add_measure(sensor, self.measurements[sensor])
P.solve()
# Then the target location is:
position = t.loc
if position is not None:
#if position.x > 0 and position.y > 0:
self.x = position.x
self.y = position.y
self.x_plot.append(position.x)
self.y_plot.append(position.y)
#print('Triangulation took %.4f seconds' % (time.time() - start_time))
else:
pass
#print('Not enough points to triangulate node...')

@staticmethod
def moving_average(a, n=3):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
self.x = t.loc[0]
self.y = t.loc[1]

return t.loc

def add_measurement(self, anchor_id, distance, condidence):
if distance > 0:
self.measurements[anchor_id] = distance
self.confidence = condidence
else:
self.measurements[anchor_id] = np.nan
self.confidence = np.nan