Skip to content

Commit b2a6fd7

Browse files
committed
azeventhubreceiver copy 2024-06-11
1 parent bfb44c2 commit b2a6fd7

25 files changed

+2341
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Azure Event Hub Receiver
2+
3+
<!-- status autogenerated section -->
4+
| Status | |
5+
| ------------- |-----------|
6+
| Stability | [alpha]: metrics, logs |
7+
| Distributions | [contrib], [observiq], [splunk], [sumo] |
8+
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fazureeventhub%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fazureeventhub) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fazureeventhub%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fazureeventhub) |
9+
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@djaglowski](https://www.github.com/djaglowski) |
10+
11+
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
12+
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
13+
[observiq]: https://github.com/observIQ/observiq-otel-collector
14+
[splunk]: https://github.com/signalfx/splunk-otel-collector
15+
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
16+
<!-- end autogenerated section -->
17+
18+
## Overview
19+
Azure resources and services can be
20+
[configured](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/diagnostic-settings)
21+
to send their logs to an Azure Event Hub. The Azure Event Hub receiver pulls logs from an Azure
22+
Event Hub, transforms them, and pushes them through the collector pipeline.
23+
24+
## Configuration
25+
26+
### connection (Required)
27+
A string describing the connection to an Azure event hub.
28+
29+
### group (Optional)
30+
The Consumer Group to read from. If empty will default to the default Consumer Group $Default
31+
32+
### partition (Optional)
33+
The partition to watch. If empty, it will watch explicitly all partitions.
34+
35+
Default: ""
36+
37+
### offset (Optional)
38+
The offset at which to start watching the event hub. If empty, it starts with the latest offset.
39+
40+
Default: ""
41+
42+
### format (Optional)
43+
Determines how to transform the Event Hub messages into OpenTelemetry logs. See the "Format"
44+
section below for details.
45+
46+
Default: "azure"
47+
48+
### Example Configuration
49+
50+
```yaml
51+
receivers:
52+
azureeventhub:
53+
connection: Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
54+
partition: foo
55+
group: bar
56+
offset: "1234-5566"
57+
format: "azure"
58+
```
59+
60+
This component can persist its state using the [storage extension].
61+
62+
## Format
63+
64+
### raw
65+
66+
The "raw" format maps the AMQP properties and data into the
67+
attributes and body of an OpenTelemetry LogRecord, respectively.
68+
The body is represented as a raw byte array.
69+
70+
This format is not supported for Metrics.
71+
72+
### azure
73+
74+
The "azure" format extracts the Azure log records from the AMQP
75+
message data, parses them, and maps the fields to OpenTelemetry
76+
attributes. The table below summarizes the mapping between the
77+
[Azure common log format](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema)
78+
and the OpenTelemetry attributes.
79+
80+
81+
| Azure | OpenTelemetry |
82+
|----------------------------------|----------------------------------------|
83+
| callerIpAddress (optional) | net.sock.peer.addr (attribute) |
84+
| correlationId (optional) | azure.correlation.id (attribute) |
85+
| category (optional) | azure.category (attribute) |
86+
| durationMs (optional) | azure.duration (attribute) |
87+
| Level (optional) | severity_number, severity_text (field) |
88+
| location (optional) | cloud.region (attribute) |
89+
| — | cloud.provider (attribute) |
90+
| operationName (required) | azure.operation.name (attribute) |
91+
| operationVersion (optional) | azure.operation.version (attribute) |
92+
| properties (optional) | azure.properties (attribute, nested) |
93+
| resourceId (required) | azure.resource.id (resource attribute) |
94+
| resultDescription (optional) | azure.result.description (attribute) |
95+
| resultSignature (optional) | azure.result.signature (attribute) |
96+
| resultType (optional) | azure.result.type (attribute) |
97+
| tenantId (required, tenant logs) | azure.tenant.id (attribute) |
98+
| time or timeStamp (required) | time_unix_nano (time takes precedence) |
99+
| identity (optional) | azure.identity (attribute, nested) |
100+
101+
Notes:
102+
* JSON does not distinguish between fixed and floating point numbers. All
103+
JSON numbers are encoded as doubles.
104+
105+
For Metrics the Azure Metric Records are an array
106+
of "records" with the following fields.
107+
108+
| Azure | Open Telemetry |
109+
|------------|---------------------------------------------|
110+
| time | time_unix_nano (field) |
111+
| resourceId | azure.resource.id (resource attribute) |
112+
| metricName | |
113+
| timeGrain | start_time_unix_nano (field) |
114+
| total | mapped to datapoint metricName + "_TOTAL" |
115+
| count | mapped to datapoint metricName + "_COUNT" |
116+
| minimum | mapped to datapoint metricName + "_MINIMUM" |
117+
| maximum | mapped to datapoint metricName + "_MAXIMUM" |
118+
| average | mapped to datapoint metricName + "_AVERAGE" |
119+
120+
From this data a Metric of type Gauge is created
121+
with a Data Points that represents the values
122+
for the Metric including: Total, Minimum, Maximum,
123+
Average and Count.
124+
125+
[storage extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package azureeventhubreceiver
2+
3+
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor.go
4+
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor_partition_client.go
5+
6+
/*
7+
>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go
8+
- get a processor
9+
- dispatchPartitionClients
10+
- processor.Run
11+
12+
13+
14+
>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go
15+
- ReceiveEvents(ctx, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
16+
- call cancel()
17+
- panic if there's an error that isn't context.DeadlineExceeded
18+
- process events
19+
--> put them into the entity thingy
20+
*/
21+
22+
// import (
23+
// "context"
24+
// "errors"
25+
// "fmt"
26+
// "time"
27+
28+
// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
29+
// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
30+
// "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
31+
// )
32+
33+
// // Assuming there's a struct managing the processor setup
34+
// // type EventHubProcessor struct {
35+
// // Processor *azeventhubs.Processor
36+
// // }
37+
38+
// // Updated initialization function using the new SDK components
39+
// func NewEventHubProcessor(ehConn, ehName, storageConn, storageCnt string) (*EventHubProcessor, error) {
40+
// checkpointingProcessor, err := newCheckpointingProcessor(ehConn, ehName, storageConn, storageCnt)
41+
// if err != nil {
42+
// return nil, fmt.Errorf("failed to create checkpointing processor: %w", err)
43+
// }
44+
45+
// // Start processing events
46+
// return &EventHubProcessor{
47+
// Processor: checkpointingProcessor,
48+
// }, nil
49+
// }
50+
51+
// // Assume there's a function to start processing events
52+
// func (e *EventHubProcessor) StartProcessing(ctx context.Context) error {
53+
// // Start the processor
54+
// if err := e.Processor.Run(ctx); err != nil {
55+
// return fmt.Errorf("error running processor: %w", err)
56+
// }
57+
// return nil
58+
// }
59+
60+
// // Assuming there's a struct managing the processor setup
61+
// type EventHubProcessor struct {
62+
// Processor *azeventhubs.Processor
63+
// }
64+
65+
// // These are config values the processor factory can use to create processors:
66+
// //
67+
// // (a) EventHubConnectionString
68+
// // (b) EventHubName
69+
// // (c) StorageConnectionString
70+
// // (d) StorageContainerName
71+
// //
72+
// // You always need the EventHub variable values.
73+
// // And you need all 4 of these to checkpoint.
74+
// //
75+
// // I think the config values should be managed in the factory struct.
76+
// /*
77+
// func (pf *processorFactory) CreateProcessor() (*azeventhubs.Processor, error) {
78+
// // Create the consumer client
79+
// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(pf.EventHubConnectionString, pf.EventHubName, azeventhubs.DefaultConsumerGroup, nil)
80+
// if err != nil {
81+
// return nil, err
82+
// }
83+
84+
// // Create the blob container client for the checkpoint store
85+
// blobContainerClient, err := container.NewClientFromConnectionString(pf.StorageConnectionString, pf.StorageContainerName, nil)
86+
// if err != nil {
87+
// return nil, err
88+
// }
89+
90+
// // Create the checkpoint store using the blob container client
91+
// checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil)
92+
// // checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil)
93+
// // if err != nil {
94+
// // return nil, err
95+
// // }
96+
97+
// // Create the processor with checkpointing
98+
// processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
99+
// if err != nil {
100+
// return nil, err
101+
// }
102+
103+
// return processor, nil
104+
// }
105+
// */
106+
107+
// // checkpointing processor should be auth aware
108+
109+
// func newCheckpointingProcessor(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.Processor, error) {
110+
// blobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)
111+
// if err != nil {
112+
// return nil, err
113+
// }
114+
// checkpointStore, err := checkpoints.NewBlobStore(blobContainerClient, nil)
115+
// if err != nil {
116+
// return nil, err
117+
// }
118+
119+
// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubConnectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)
120+
// if err != nil {
121+
// return nil, err
122+
// }
123+
124+
// return azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
125+
// }
126+
/*
127+
func dispatchPartitionClients(processor *azeventhubs.Processor) {
128+
for {
129+
processorPartitionClient := processor.NextPartitionClient(context.TODO())
130+
if processorPartitionClient == nil {
131+
break
132+
}
133+
134+
go func() {
135+
if err := processEventsForPartition(processorPartitionClient); err != nil {
136+
panic(err)
137+
}
138+
}()
139+
}
140+
}
141+
142+
func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error {
143+
defer shutdownPartitionResources(partitionClient)
144+
if err := initializePartitionResources(partitionClient.PartitionID()); err != nil {
145+
return err
146+
}
147+
148+
for {
149+
receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute)
150+
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
151+
cancelReceive()
152+
153+
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
154+
return err
155+
}
156+
if len(events) == 0 {
157+
continue
158+
}
159+
160+
if err := processEvents(events, partitionClient); err != nil {
161+
return err
162+
}
163+
164+
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
165+
return err
166+
}
167+
}
168+
}
169+
170+
func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
171+
if err := partitionClient.Close(context.TODO()); err != nil {
172+
panic(err)
173+
}
174+
}
175+
176+
func initializePartitionResources(partitionID string) error {
177+
fmt.Printf("Initializing resources for partition %s\n", partitionID)
178+
return nil
179+
}
180+
181+
// This is very much like the old processEvents function
182+
func processEvents(events []*azeventhubs.ReceivedEventData, partitionClient *azeventhubs.ProcessorPartitionClient) error {
183+
for _, event := range events {
184+
185+
186+
// fmt.Printf("Processing event: %v\n", event.EventData())
187+
}
188+
return nil
189+
}
190+
*/
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"
5+
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/pdata/plog"
9+
"go.uber.org/zap"
10+
11+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure"
13+
)
14+
15+
type AzureResourceLogsEventUnmarshaler struct {
16+
unmarshaler *azure.ResourceLogsUnmarshaler
17+
}
18+
19+
func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventLogsUnmarshaler {
20+
return AzureResourceLogsEventUnmarshaler{
21+
unmarshaler: &azure.ResourceLogsUnmarshaler{
22+
Version: buildInfo.Version,
23+
Logger: logger,
24+
},
25+
}
26+
}
27+
28+
// UnmarshalLogs takes a byte array containing a JSON-encoded
29+
// payload with Azure log records and transforms it into
30+
// an OpenTelemetry plog.Logs object. The data in the Azure
31+
// log record appears as fields and attributes in the
32+
// OpenTelemetry representation; the bodies of the
33+
// OpenTelemetry log records are empty.
34+
func (r AzureResourceLogsEventUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) {
35+
return r.unmarshaler.UnmarshalLogs(event.Body)
36+
}

0 commit comments

Comments
 (0)