Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openmeter/meterexport/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Service interface {
//
// NOTE: Currently only SUM and COUNT meters are supported.
// NOTE: GroupBy values are not yet supported.
// NOTE: Subjects and Customers are not honored in the exported data.
// NOTE: Customers are not honored in the exported data.
ExportSyntheticMeterData(ctx context.Context, config DataExportConfig, result chan<- streaming.RawEvent, err chan<- error) error

// ExportSyntheticMeterDataIter is an iterator-based wrapper around ExportSyntheticMeterData.
Expand Down
12 changes: 6 additions & 6 deletions openmeter/meterexport/service/funnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ func (p funnelParams) validateUnsupportedParams() []error {
errs = append(errs, errors.New("filter customer is not supported"))
}

if len(p.queryParams.FilterSubject) > 0 {
errs = append(errs, errors.New("filter subject is not supported"))
}

if len(p.queryParams.FilterGroupBy) > 0 {
errs = append(errs, errors.New("filter group by is not supported"))
}

if len(p.queryParams.GroupBy) > 0 {
errs = append(errs, errors.New("group by is not supported"))
// GroupBy subject is allowed (used internally for per-subject export)
for _, g := range p.queryParams.GroupBy {
if g != "subject" {
errs = append(errs, errors.New("group by is only supported for subject"))
break
}
}
Comment on lines +56 to 62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

GroupBy is validated but never sent to QueryMeter (so grouping likely does nothing).
Nice validation tweak, but the actual query drops params.queryParams.GroupBy.

 		queryParams := streaming.QueryParams{
 			From:           &queryFrom,
 			To:             &queryTo,
 			WindowSize:     params.queryParams.WindowSize,
 			WindowTimeZone: params.queryParams.WindowTimeZone,
+			GroupBy:        params.queryParams.GroupBy,
 		}

Also applies to: 99-107

🤖 Prompt for AI Agents
In openmeter/meterexport/service/funnel.go around lines 56-62 and again around
99-107, GroupBy is validated but never forwarded to QueryMeter, so grouping has
no effect; update the code paths that build the query params passed to
QueryMeter to include params.queryParams.GroupBy (or the equivalent GroupBy
field) when calling QueryMeter, ensuring the validated GroupBy slice is copied
or referenced into the request object/params sent to QueryMeter in both
locations so grouping is applied at query time.


return errs
Expand Down
5 changes: 0 additions & 5 deletions openmeter/meterexport/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type Config struct {
// Configuration
EventSourceGroup string
ExportSubject string

// Dependencies
StreamingConnector streaming.Connector
Expand All @@ -21,10 +20,6 @@ type Config struct {
func (c Config) validate() error {
var errs []error

if c.ExportSubject == "" {
errs = append(errs, errors.New("export subject is required"))
}

if c.EventSourceGroup == "" {
errs = append(errs, errors.New("event source group is required"))
}
Expand Down
17 changes: 0 additions & 17 deletions openmeter/meterexport/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func TestExportSyntheticMeterData(t *testing.T) {

// Create service
svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -308,7 +307,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
}

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -384,7 +382,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
mockStreaming.AddSimpleEvent("test-meter", 10.0, now.Add(-5*time.Minute))

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -453,15 +450,6 @@ func TestServiceNew(t *testing.T) {
assert.Contains(t, err.Error(), "meter service is required")
})

t.Run("should fail without export subject", func(t *testing.T) {
_, err := New(Config{
StreamingConnector: testutils.NewMockStreamingConnector(t),
MeterService: NewMockMeterService(),
})
require.Error(t, err)
assert.Contains(t, err.Error(), "export subject is required")
})

t.Run("should fail without event source group", func(t *testing.T) {
_, err := New(Config{
StreamingConnector: testutils.NewMockStreamingConnector(t),
Expand All @@ -475,7 +463,6 @@ func TestServiceNew(t *testing.T) {
svc, err := New(Config{
StreamingConnector: testutils.NewMockStreamingConnector(t),
MeterService: NewMockMeterService(),
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
})
require.NoError(t, err)
Expand Down Expand Up @@ -512,7 +499,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
mockStreaming.AddSimpleEvent("test-meter", 30.0, now.Add(-2*time.Minute))

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -576,7 +562,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
}

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -621,7 +606,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
mockStreaming := testutils.NewMockStreamingConnector(t)

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down Expand Up @@ -668,7 +652,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
mockStreaming := testutils.NewMockStreamingConnector(t)

svc, err := New(Config{
ExportSubject: "test-subject",
EventSourceGroup: "test-source",
StreamingConnector: mockStreaming,
MeterService: mockMeterService,
Expand Down
3 changes: 2 additions & 1 deletion openmeter/meterexport/service/syntheticdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (s *service) ExportSyntheticMeterData(ctx context.Context, config meterexpo
To: config.Period.To,
WindowSize: &config.ExportWindowSize,
WindowTimeZone: time.UTC,
GroupBy: []string{"subject"},
},
}, meterRowCh, meterRowErrCh)
})
Expand All @@ -146,7 +147,7 @@ func (s *service) createEventFromMeterRow(m meter.Meter, row meter.MeterQueryRow
ID: ulid.Make().String(),
Type: m.EventType, // We reuse the same type as the source meter
Source: fmt.Sprintf("%s:%s/%s", s.EventSourceGroup, m.Namespace, m.ID),
Subject: s.ExportSubject,
Subject: lo.FromPtr(row.Subject),
IngestedAt: clock.Now(),
Time: row.WindowStart,
CustomerID: nil,
Expand Down
Loading