1616import string
1717from threading import Event
1818from threading import Thread
19+ from collections import deque
1920
2021import rclpy
2122from rclpy .executors import SingleThreadedExecutor
@@ -50,9 +51,10 @@ def method_2():
5051 wait_for_topics.shutdown()
5152 """
5253
53- def __init__ (self , topic_tuples , timeout = 5.0 ):
54+ def __init__ (self , topic_tuples , timeout = 5.0 , max_number_of_messages = 100 ):
5455 self .topic_tuples = topic_tuples
5556 self .timeout = timeout
57+ self .max_number_of_messages = max_number_of_messages
5658 self .__ros_context = rclpy .Context ()
5759 rclpy .init (context = self .__ros_context )
5860 self .__ros_executor = SingleThreadedExecutor (context = self .__ros_context )
@@ -64,9 +66,13 @@ def __init__(self, topic_tuples, timeout=5.0):
6466 self .__ros_spin_thread .start ()
6567
6668 def _prepare_ros_node (self ):
67- node_name = '_test_node_' + \
68- '' .join (random .choices (string .ascii_uppercase + string .digits , k = 10 ))
69- self .__ros_node = _WaitForTopicsNode (name = node_name , node_context = self .__ros_context )
69+ node_name = '_test_node_' + '' .join (
70+ random .choices (string .ascii_uppercase + string .digits , k = 10 )
71+ )
72+ self .__ros_node = _WaitForTopicsNode (
73+ name = node_name , node_context = self .__ros_context ,
74+ max_number_of_messages = self .max_number_of_messages
75+ )
7076 self .__ros_executor .add_node (self .__ros_node )
7177
7278 def wait (self ):
@@ -108,9 +114,11 @@ def __exit__(self, exep_type, exep_value, trace):
108114class _WaitForTopicsNode (Node ):
109115 """Internal node used for subscribing to a set of topics."""
110116
111- def __init__ (self , name = 'test_node' , node_context = None ):
117+ def __init__ (self , name = 'test_node' , node_context = None ,
118+ max_number_of_messages = None ):
112119 super ().__init__ (node_name = name , context = node_context )
113120 self .msg_event_object = Event ()
121+ self .max_number_of_messages = max_number_of_messages
114122
115123 def start_subscribers (self , topic_tuples ):
116124 self .subscriber_list = []
@@ -128,14 +136,16 @@ def start_subscribers(self, topic_tuples):
128136 10
129137 )
130138 )
139+ # Initialize ring buffer of received messages
140+ self .received_messages [topic_name ] = deque (maxlen = self .max_number_of_messages )
131141
132142 def callback_template (self , topic_name ):
133143
134144 def topic_callback (data ):
135145 if topic_name not in self .received_topics :
136146 self .get_logger ().debug ('Message received for ' + topic_name )
137147 self .received_topics .add (topic_name )
138- self .received_messages . setdefault ( topic_name , []) .append (data )
148+ self .received_messages [ topic_name ] .append (data )
139149 if self .received_topics == self .expected_topics :
140150 self .msg_event_object .set ()
141151
0 commit comments