Skip to content

Commit 2fb253d

Browse files
committed
feat(sda-validator-orchestrator): update broker.ConnectionWatcher -> broker.Monitor and also listen to channel close events
1 parent 0ca56ec commit 2fb253d

File tree

7 files changed

+24
-13
lines changed

7 files changed

+24
-13
lines changed

sda-validator/orchestrator/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,9 @@ func (m *mockBroker) Close() error {
192192
panic("broker.close call not expected in unit tests")
193193
}
194194

195-
func (m *mockBroker) ConnectionWatcher() chan *amqp.Error {
195+
func (m *mockBroker) Monitor() chan *amqp.Error {
196196
// Function not needed for unit test, but to implement interface
197-
panic("broker.ConnectionWatcher call not expected in unit tests")
197+
panic("broker.Monitor call not expected in unit tests")
198198
}
199199

200200
func mockAuthenticator(c *gin.Context) {

sda-validator/orchestrator/internal/broker/broker.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,18 @@ func (broker *amqpBroker) Close() error {
207207
return broker.connection.Close()
208208
}
209209

210-
// ConnectionWatcher listens to events from the server
211-
func (broker *amqpBroker) ConnectionWatcher() chan *amqp.Error {
212-
return broker.connection.NotifyClose(make(chan *amqp.Error))
210+
// Monitor listens to close events on the broker connection and channel
211+
func (broker *amqpBroker) Monitor() chan *amqp.Error {
212+
errChan := make(chan *amqp.Error)
213+
214+
go func() {
215+
select {
216+
case err := <-broker.connection.NotifyClose(make(chan *amqp.Error, 1)):
217+
errChan <- err
218+
case err := <-broker.channel.NotifyClose(make(chan *amqp.Error, 1)):
219+
errChan <- err
220+
}
221+
}()
222+
223+
return errChan
213224
}

sda-validator/orchestrator/internal/broker/interface_broker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ type AMQPBrokerI interface {
1313
Subscribe(ctx context.Context, queue, consumerID string, handleFunc func(context.Context, amqp.Delivery) error) error
1414
// Close the broker connection
1515
Close() error
16-
// ConnectionWatcher returns a chan watching the broker connection events
17-
ConnectionWatcher() chan *amqp.Error
16+
// Monitor returns a chan watching the broker connection and channel close events
17+
Monitor() chan *amqp.Error
1818
}

sda-validator/orchestrator/jobpreparationworker/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (w *worker) close() {
118118
close(w.stopCh)
119119
}
120120

121-
// ShutdownWorkers shutdowns and waits for all workers to have closed
121+
// Shutdown shutdowns and waits for all workers to have closed
122122
func (w *Workers) Shutdown() {
123123
wg := sync.WaitGroup{}
124124
for _, w := range w.workers {

sda-validator/orchestrator/jobpreparationworker/worker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ func (m *mockBroker) Close() error {
206206
panic("broker.close call not expected in unit tests")
207207
}
208208

209-
func (m *mockBroker) ConnectionWatcher() chan *amqp.Error {
209+
func (m *mockBroker) Monitor() chan *amqp.Error {
210210
// Function not needed for unit test, but to implement interface
211-
panic("broker.ConnectionWatcher call not expected in unit tests")
211+
panic("broker.Monitor call not expected in unit tests")
212212
}
213213

214214
func (ts *JobPreparationWorkerTestSuite) TestInitWorkers() {

sda-validator/orchestrator/jobworker/worker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ func (m *mockBroker) Close() error {
167167
panic("broker.close call not expected in unit tests")
168168
}
169169

170-
func (m *mockBroker) ConnectionWatcher() chan *amqp.Error {
170+
func (m *mockBroker) Monitor() chan *amqp.Error {
171171
// Function not needed for unit test, but to implement interface
172-
panic("broker.ConnectionWatcher call not expected in unit tests")
172+
panic("broker.Monitor call not expected in unit tests")
173173
}
174174

175175
func (ts *JobWorkerTestSuite) TestInitWorkers() {

sda-validator/orchestrator/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func main() {
162162
}()
163163

164164
go func() {
165-
if err := <-amqpBroker.ConnectionWatcher(); err != nil {
165+
if err := <-amqpBroker.Monitor(); err != nil {
166166
log.Errorf("broker connection error: %v", err)
167167
sigc <- syscall.SIGTERM
168168
}

0 commit comments

Comments
 (0)