@@ -12,15 +12,13 @@ import (
1212
1313 "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1414 "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
15+ "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
16+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
1517 "go.opentelemetry.io/collector/component"
1618 "go.opentelemetry.io/collector/receiver"
1719 "go.uber.org/zap"
1820)
1921
20- const (
21- batchCount = 100
22- )
23-
2422type eventHandler interface {
2523 run (ctx context.Context , host component.Host ) error
2624 close (ctx context.Context ) error
@@ -40,11 +38,7 @@ type consumerClientWrapperImpl struct {
4038}
4139
4240func newConsumerClientWrapperImplementation (cfg * Config ) (* consumerClientWrapperImpl , error ) {
43- splits := strings .Split (cfg .Connection , "/" )
44- eventhubName := splits [len (splits )- 1 ]
45- // if that didn't work it's ok as the SDK will try to parse it to create the client
46-
47- consumerClient , err := azeventhubs .NewConsumerClientFromConnectionString (cfg .Connection , eventhubName , cfg .ConsumerGroup , nil )
41+ consumerClient , err := azeventhubs .NewConsumerClientFromConnectionString (cfg .Connection , cfg .EventHubName , cfg .ConsumerGroup , nil )
4842 if err != nil {
4943 return nil , err
5044 }
@@ -61,7 +55,7 @@ func (c *consumerClientWrapperImpl) GetPartitionProperties(ctx context.Context,
6155 return c .consumerClient .GetPartitionProperties (ctx , partitionID , options )
6256}
6357
64- func (c * consumerClientWrapperImpl ) NewConsumer (_ context.Context , _ * azeventhubs.ConsumerClientOptions ) (* azeventhubs.ConsumerClient , error ) {
58+ func (c * consumerClientWrapperImpl ) NewConsumer (ctx context.Context , options * azeventhubs.ConsumerClientOptions ) (* azeventhubs.ConsumerClient , error ) {
6559 return c .consumerClient , nil
6660}
6761
@@ -74,6 +68,7 @@ func (c *consumerClientWrapperImpl) Close(ctx context.Context) error {
7468}
7569
7670type eventhubHandler struct {
71+ processor * azeventhubs.Processor
7772 consumerClient consumerClientWrapper
7873 dataConsumer dataConsumer
7974 config * Config
@@ -94,7 +89,7 @@ func newEventhubHandler(config *Config, settings receiver.CreateSettings) *event
9489 return & eventhubHandler {
9590 config : config ,
9691 settings : settings ,
97- useProcessor : true ,
92+ useProcessor : false ,
9893 }
9994}
10095
@@ -109,33 +104,25 @@ func (h *eventhubHandler) init(ctx context.Context) error {
109104}
110105
111106func (h * eventhubHandler ) run (ctx context.Context , host component.Host ) error {
112- ctx , h .cancel = context .WithCancel (ctx )
113107 if h .useProcessor {
114- return h .runWithProcessor (ctx , host )
108+ return h .runWithProcessor (ctx )
115109 }
116110 return h .runWithConsumerClient (ctx , host )
117111}
118- func (h * eventhubHandler ) runWithProcessor (ctx context.Context , host component.Host ) error {
119- checkpointStore , err := createCheckpointStore (ctx , host , h .config , h .settings )
112+
113+ func (h * eventhubHandler ) runWithProcessor (ctx context.Context ) error {
114+ checkpointStore , err := createCheckpointStore (h .config .StorageConnection , h .config .StorageContainer )
120115 if err != nil {
121- h .settings .Logger .Debug ("Error creating CheckpointStore" , zap .Error (err ))
122116 return err
123117 }
124118
125- consumerClientImpl , ok := h .consumerClient .(* consumerClientWrapperImpl )
126- if ! ok {
127- // we're in a testing environment
128- return nil
129- }
130-
131- processor , err := azeventhubs .NewProcessor (consumerClientImpl .consumerClient , checkpointStore , nil )
119+ processor , err := azeventhubs .NewProcessor (h .consumerClient .(* consumerClientWrapperImpl ).consumerClient , checkpointStore , nil )
132120 if err != nil {
133- h .settings .Logger .Debug ("Error creating Processor" , zap .Error (err ))
134121 return err
135122 }
136123
137- processorCtx , processorCancel := context .WithCancel (ctx )
138124 go h .dispatchPartitionClients (processor )
125+ processorCtx , processorCancel := context .WithCancel (ctx )
139126 defer processorCancel ()
140127
141128 return processor .Run (processorCtx )
@@ -166,7 +153,7 @@ func (h *eventhubHandler) processEventsForPartition(partitionClient *azeventhubs
166153
167154 for {
168155 receiveCtx , cancelReceive := context .WithTimeout (context .TODO (), time .Minute )
169- events , err := partitionClient .ReceiveEvents (receiveCtx , 100 , nil )
156+ events , err := partitionClient .ReceiveEvents (receiveCtx , h . config . BatchCount , nil )
170157 cancelReceive ()
171158
172159 if err != nil && ! errors .Is (err , context .DeadlineExceeded ) {
@@ -193,7 +180,7 @@ func (h *eventhubHandler) processEventsForPartition(partitionClient *azeventhubs
193180 }
194181}
195182
196- func (h * eventhubHandler ) runWithConsumerClient (ctx context.Context , _ component.Host ) error {
183+ func (h * eventhubHandler ) runWithConsumerClient (ctx context.Context , host component.Host ) error {
197184 if h .consumerClient == nil {
198185 if err := h .init (ctx ); err != nil {
199186 return err
@@ -231,7 +218,11 @@ func (h *eventhubHandler) setupPartition(ctx context.Context, partitionID string
231218 if cc == nil {
232219 return errors .New ("failed to initialize consumer client" )
233220 }
234- defer cc .Close (ctx )
221+ defer func () {
222+ if cc != nil {
223+ cc .Close (ctx )
224+ }
225+ }()
235226
236227 pcOpts := & azeventhubs.PartitionClientOptions {
237228 StartPosition : azeventhubs.StartPosition {
@@ -258,12 +249,15 @@ func (h *eventhubHandler) setupPartition(ctx context.Context, partitionID string
258249}
259250
260251func (h * eventhubHandler ) receivePartitionEvents (ctx context.Context , pc * azeventhubs.PartitionClient ) {
252+ var wait = 1
261253 for {
262- rcvCtx , rcvCtxCancel := context .WithTimeout (context .TODO (), time .Second * 10 )
263- events , err := pc .ReceiveEvents (rcvCtx , batchCount , nil )
264- rcvCtxCancel ()
265- if err != nil && ! errors .Is (err , context .DeadlineExceeded ) {
266- h .settings .Logger .Error ("Error receiving events" , zap .Error (err ))
254+ rcvCtx , _ := context .WithTimeout (context .TODO (), time .Second * 10 )
255+ events , err := pc .ReceiveEvents (rcvCtx , h .config .BatchCount , nil )
256+ if err != nil {
257+ h .settings .Logger .Error ("Error receiving event" , zap .Error (err ))
258+ time .Sleep (time .Duration (wait ) * time .Second )
259+ wait *= 2
260+ continue
267261 }
268262
269263 for _ , event := range events {
@@ -297,3 +291,11 @@ func (h *eventhubHandler) close(ctx context.Context) error {
297291func (h * eventhubHandler ) setDataConsumer (dataConsumer dataConsumer ) {
298292 h .dataConsumer = dataConsumer
299293}
294+
295+ func createCheckpointStore (storageConnectionString , containerName string ) (azeventhubs.CheckpointStore , error ) {
296+ azBlobContainerClient , err := container .NewClientFromConnectionString (storageConnectionString , containerName , nil )
297+ if err != nil {
298+ return nil , err
299+ }
300+ return checkpoints .NewBlobStore (azBlobContainerClient , nil )
301+ }
0 commit comments