diff --git a/uwb_localization/nodes/localization_listener_node.py b/uwb_localization/nodes/localization_listener_node.py index 19507b61..5744e4b3 100755 --- a/uwb_localization/nodes/localization_listener_node.py +++ b/uwb_localization/nodes/localization_listener_node.py @@ -1,64 +1,64 @@ #!/usr/bin/env python3 -import os -import sys -import glob + import rospy -import rospkg -import math -import time -import matplotlib -matplotlib.use('Agg') # necessary when plotting without $DISPLAY -from itertools import combinations + import numpy as np import pandas as pd -import matplotlib.pyplot as plt from scipy.spatial.transform import Rotation as R + from hwctrl.msg import UwbData -from uwb_localization.triangulation import UltraWideBandNode from std_msgs.msg import Header from geometry_msgs.msg import Quaternion, PoseWithCovarianceStamped, PoseWithCovariance, Pose, Point +from localization.geoInterface import Target, Anchor +import localization.geoProject as Project + +from uwb_localization.triangulation import UltraWideBandNode + +class UwbLocalizationNode: + + node_topic = "localization_data" + viz_dir = "visualizations/" + -class LocalizationNode: def __init__(self, visualize=False): - self.topic = 'localization_data' # topic where UWB distances are published - self.viz_dir = 'visualizations/' # directory to store node visualizations - self.visualize = visualize - self.viz_step = 100 - print('Booting up node...') - rospy.init_node('localization_listener', anonymous=True) - self.nodes = [] # list of UltraWideBandNode instances, one for each node and one for each anchor - self.robot_x = [] # list of past and current x positions - self.robot_y = [] # list of past and current y positions - self.robot_theta = [] # list of past and current yaw measurements - self.sensors = None # DataFrame of sensors, types, and relative positions - - # setup visualization directory and remove all past visualizations - os.makedirs(self.viz_dir, exist_ok=True) - try: - files = glob.glob('%s/*' % self.viz_dir) - for f in files: - os.remove(f) - except Exception as e: - print(e) - - def init_nodes(self): - rp = rospkg.RosPack() - # find config file in canbus node directory - script_path = os.path.join(rp.get_path("canbus"), "include", "node_config.csv") - sensors = pd.read_csv(script_path) - self.sensors = sensors - print(sensors) - return sensors - - def get_robot_orientation(self): - non_anchors = [x for x in self.nodes if x.type == 'node'] # get all nodes on the robot - node_pairs = combinations(non_anchors, 2) # get all pairwise combinations of nodes - thetas = [] # list to store bearing angles measured between all pairs + rospy.init_node("localization_listener", anonymous=False) + + self.robot_x = [] + self.robot_y = [] + self.robot_theta = [] + + self.nodes = {} + self.anchors = [] + + self.pub = rospy.Publisher("uwb_nodes", PoseWithCovarianceStamped, queue_size=12) + self.sub = rospy.Subscriber("localization_data", UwbData, self.position_callback, None, 12) + + # get anchors + for i in range(3): + base_path = f"/hardware/anchors/anchor_{i}/" + id = int(rospy.get_param(base_path + "id")) + x = float(rospy.get_param(base_path + "x")) + y = float(rospy.get_param(base_path + "y")) + self.anchors.append((id, x , y)) + + # get uwbs + for i in range(4): + base_path = f"/hardware/sensors/uwb_{i}/" + id = int(rospy.get_param(base_path + "id")) + rel_x = int(rospy.get_param(base_path + "relative_x")) + rel_y = int(rospy.get_param(base_path + "relative_x")) + + uwb = UltraWideBandNode(id, rel_x, rel_y, anchors) + nodes[id] = uwb + + + def get_robot_rotation(self): + thetas = [] + node_pairs = combinations(self.nodes.values(), 2) for (start_node, end_node) in node_pairs: if start_node.is_valid() and end_node.is_valid(): - # get vector with tail at start node and head at end end robot_edge = -np.array([start_node.relative_x, start_node.relative_y]) + np.array([end_node.relative_x, end_node.relative_y]) # each edge vector is at a different angle relative to the robot coordinate system @@ -67,25 +67,25 @@ def get_robot_orientation(self): dX = end_node.x - start_node.x theta = np.arctan2(dY, dX) - theta_offset thetas.append([np.cos(theta), np.sin(theta)]) + if len(thetas) > 0: theta = np.mean(thetas, axis=0) self.robot_theta.append(np.arctan2(theta[1], theta[0])) return 0 if len(self.robot_theta) == 0 else self.robot_theta[-1] - def position_callback(self, msg): - for node in self.nodes: - if node.id == msg.node_id: - node.add_measurement(msg.anchor_id, msg.distance, 0) - node.get_position() + + def data_callback(self, msg): + self.nodes[msg.node_id].add_measurement(msg.anchor_id, msg.distance, 0) + self.nodes[msg.node_id].get_position() + avg_x = 0 avg_y = 0 - total = 0 - theta = self.get_robot_orientation() - non_anchors = [x for x in self.nodes if x.type == 'node'] # get all nodes on the robot - node_pairs = combinations(non_anchors, 2) # get all pairwise combinations of nodes + theta = self.get_robot_rotation() + + node_pairs = combinations(self.nodes.values(), 2) for (start_node, end_node) in node_pairs: if start_node.is_valid() and end_node.is_valid(): - #print(start_node.id, end_node.id, start_node.relative_x, end_node.relative_x, start_node.relative_y, end_node.relative_y) + # no idea what this line means? opposite corners? if start_node.relative_x == -end_node.relative_x and start_node.relative_y == -end_node.relative_y: start_node.robot_x = (start_node.x + end_node.x) * 0.5 end_node.robot_x = start_node.robot_x @@ -95,16 +95,6 @@ def position_callback(self, msg): avg_y += start_node.robot_y total += 1 - #for node in self.nodes: - # if node.is_valid(): - # # offset node measurements by their relative positions to the center of the robot - # # must correct for current orientation of robot as well - # phi = math.atan2(node.relative_y, node.relative_x) - # z = math.sqrt(node.relative_x ** 2 + node.relative_y ** 2) - # avg_x += node.x + z * math.cos(theta - phi) - # avg_y += node.y - z * math.sin(theta - phi) - # total += 1 - print('Total:', total) if total > 0: self.robot_x.append(avg_x / total) @@ -112,36 +102,6 @@ def position_callback(self, msg): print('X: %.3f, Y: %.3f, theta: %.3f' % (self.robot_x[-1], self.robot_y[-1], theta)) self.compose_msg() - if self.visualize and len(self.robot_x) % self.viz_step == 0: - fig = plt.figure(figsize=(6 * 4, 9)) - - ax = plt.subplot(141) - ax.set_title('Position') - for node in self.nodes: - node.plot_position(theta, ax=ax) - - ax.legend(loc='best') - ax.set_xlim(0, 5.2) - ax.set_ylim(0, 6.05) - - ax = plt.subplot(142) - ax.set_title('Node Distances') - for node in self.nodes: - ax.plot(node.distance_plot, label=node.id) - ax.set_ylim(0, 6.05) - ax.legend(loc='best') - - ax = plt.subplot(143) - ax.scatter(self.robot_x, self.robot_y, label='robot') - ax.arrow(self.robot_x[-1], self.robot_y[-1], .3 * math.cos(theta), .3 * math.sin(theta), head_width=0.1) - ax.legend(loc='best') - ax.set_xlim(0, 5.2) - ax.set_ylim(0, 6.05) - - plt.tight_layout() - fig.savefig(self.viz_dir + 'node_1_%d.png' % (len(os.listdir(self.viz_dir)))) - plt.close() - def compose_msg(self): header = Header() header.stamp = rospy.Time.now() @@ -157,20 +117,13 @@ def compose_msg(self): stamped_msg.header = header stamped_msg.pose = pose_with_cov try: - pub = rospy.Publisher('uwb_nodes', PoseWithCovarianceStamped, queue_size=1) - pub.publish(stamped_msg) + self.pub.publish(stamped_msg) except rospy.ROSInterruptException as e: print(e.getMessage()) pass -if __name__ == '__main__': - localization_node = LocalizationNode() - sensors = localization_node.init_nodes() - for i, sensor in sensors.iterrows(): - if sensor['type'] == 'node': - uwb_node = UltraWideBandNode(sensor['id'], sensor['x'], sensor['y'], sensor['type'], sensors) - localization_node.nodes.append(uwb_node) - - sub = rospy.Subscriber(localization_node.topic, UwbData, localization_node.position_callback, queue_size=12) +if __name__ == "__main__": + node = UwbLocalizationNode() +# uwb_sub = rospy.Subscriber(node.node_topic, UwbData, node.data_callback, None, 12) rospy.spin() diff --git a/uwb_localization/src/uwb_localization/triangulation.py b/uwb_localization/src/uwb_localization/triangulation.py index fe033f0d..74c3de51 100644 --- a/uwb_localization/src/uwb_localization/triangulation.py +++ b/uwb_localization/src/uwb_localization/triangulation.py @@ -1,84 +1,57 @@ #!/usr/bin/env python3 -import time -import math + +import localization.geoProject as Project +import numpy as np import pandas as pd + import matplotlib.pyplot as plt -import numpy as np -from localization.geoProject import Project class UltraWideBandNode: - def __init__(self, id, relative_x, relative_y, type, sensors): - self.id = id - self.x = None - self.y = None - self.robot_x = None - self.robot_y = None + + def __init__(self, id, relative_x, relative_y, anchors): + # list of anchors, contains the id number, x, and y + self.anchors = anchors + # map of measurements from each anchor + self.measurements = {} + self.relative_x = relative_x self.relative_y = relative_y - self.type = type - self.confidence = None - self.sensors = sensors - self.measurements = {} - self.x_plot = [] - self.y_plot = [] - self.distance_plot = [] - def is_valid(self): - return self.x is not None and self.y is not None + self.x = None + self.y = None + self.condidence = None - def get_robot_position(self): - sensor = self.sensors[self.sensors['id'] == self.id] - return float(sensor['x']), float(sensor['y']) + self.id = id + + def is_valid(self) -> bool: + return self.x is not None and self.y is not None - def add_measurement(self, anchor_id, distance, confidence): - if distance >= 0: - self.measurements[anchor_id] = distance - if int(anchor_id) == 51: - self.distance_plot.append(distance) - self.confidence = confidence - else: - self.measurements[anchor_id] = np.nan # set to invalid value to be ignored - - def plot_position(self, theta, ax, moving_average=False): - if moving_average: - if len(self.x_plot) >= 3 and len(self.y_plot) >= 3: - avg_x = self.moving_average(self.x_plot) - avg_y = self.moving_average(self.y_plot) - ax.scatter(avg_x, avg_y, label=str(self.id)) - else: - ax.scatter(self.x_plot, self.y_plot, label=str(self.id)) - if self.robot_x is not None and self.robot_y is not None: - ax.plot([self.x_plot[-1], self.robot_x], [self.y_plot[-1], self.robot_y], marker='o') def get_position(self): - if len(list(self.measurements.keys())) >= 3: - P = Project(mode='2D', solver='LSE_GC') + if len(self.measurements.keys()) < 3: + pass + else: + P = Project(mode="2D", solver="LSE") + for id, x, y in self.anchors: + P.add_anchor(id, (x, y)) - for i, sensor in self.sensors.iterrows(): - if sensor['type'] == 'anchor': - P.add_anchor(sensor['id'], (sensor['x'], sensor['y'])) + t, lbl = P.add_target() - t,label=P.add_target() + for anchor_id in self.measurements.keys(): + t.add_measurement(anchor_id, self.measurements[anchor_id]) - for sensor in self.measurements.keys(): - t.add_measure(sensor, self.measurements[sensor]) P.solve() - # Then the target location is: - position = t.loc - if position is not None: - #if position.x > 0 and position.y > 0: - self.x = position.x - self.y = position.y - self.x_plot.append(position.x) - self.y_plot.append(position.y) - #print('Triangulation took %.4f seconds' % (time.time() - start_time)) - else: - pass - #print('Not enough points to triangulate node...') - @staticmethod - def moving_average(a, n=3): - ret = np.cumsum(a, dtype=float) - ret[n:] = ret[n:] - ret[:-n] - return ret[n - 1:] / n + self.x = t.loc[0] + self.y = t.loc[1] + + return t.loc + + def add_measurement(self, anchor_id, distance, condidence): + if distance > 0: + self.measurements[anchor_id] = distance + self.confidence = condidence + else: + self.measurements[anchor_id] = np.nan + self.confidence = np.nan