diff --git a/openmeter/meterexport/service.go b/openmeter/meterexport/service.go index da53f7fa8..2c22d98f5 100644 --- a/openmeter/meterexport/service.go +++ b/openmeter/meterexport/service.go @@ -28,7 +28,7 @@ type Service interface { // // NOTE: Currently only SUM and COUNT meters are supported. // NOTE: GroupBy values are not yet supported. - // NOTE: Subjects and Customers are not honored in the exported data. + // NOTE: Customers are not honored in the exported data. ExportSyntheticMeterData(ctx context.Context, config DataExportConfig, result chan<- streaming.RawEvent, err chan<- error) error // ExportSyntheticMeterDataIter is an iterator-based wrapper around ExportSyntheticMeterData. diff --git a/openmeter/meterexport/service/funnel.go b/openmeter/meterexport/service/funnel.go index 0aebdbd2c..750e37e9d 100644 --- a/openmeter/meterexport/service/funnel.go +++ b/openmeter/meterexport/service/funnel.go @@ -49,16 +49,16 @@ func (p funnelParams) validateUnsupportedParams() []error { errs = append(errs, errors.New("filter customer is not supported")) } - if len(p.queryParams.FilterSubject) > 0 { - errs = append(errs, errors.New("filter subject is not supported")) - } - if len(p.queryParams.FilterGroupBy) > 0 { errs = append(errs, errors.New("filter group by is not supported")) } - if len(p.queryParams.GroupBy) > 0 { - errs = append(errs, errors.New("group by is not supported")) + // GroupBy subject is allowed (used internally for per-subject export) + for _, g := range p.queryParams.GroupBy { + if g != "subject" { + errs = append(errs, errors.New("group by is only supported for subject")) + break + } } return errs diff --git a/openmeter/meterexport/service/service.go b/openmeter/meterexport/service/service.go index 7cfd242a5..714809644 100644 --- a/openmeter/meterexport/service/service.go +++ b/openmeter/meterexport/service/service.go @@ -11,7 +11,6 @@ import ( type Config struct { // Configuration EventSourceGroup string - ExportSubject string // Dependencies StreamingConnector streaming.Connector @@ -21,10 +20,6 @@ type Config struct { func (c Config) validate() error { var errs []error - if c.ExportSubject == "" { - errs = append(errs, errors.New("export subject is required")) - } - if c.EventSourceGroup == "" { errs = append(errs, errors.New("event source group is required")) } diff --git a/openmeter/meterexport/service/service_test.go b/openmeter/meterexport/service/service_test.go index acaeb5867..86dc41733 100644 --- a/openmeter/meterexport/service/service_test.go +++ b/openmeter/meterexport/service/service_test.go @@ -209,7 +209,6 @@ func TestExportSyntheticMeterData(t *testing.T) { // Create service svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -308,7 +307,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) { } svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -384,7 +382,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) { mockStreaming.AddSimpleEvent("test-meter", 10.0, now.Add(-5*time.Minute)) svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -453,15 +450,6 @@ func TestServiceNew(t *testing.T) { assert.Contains(t, err.Error(), "meter service is required") }) - t.Run("should fail without export subject", func(t *testing.T) { - _, err := New(Config{ - StreamingConnector: testutils.NewMockStreamingConnector(t), - MeterService: NewMockMeterService(), - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "export subject is required") - }) - t.Run("should fail without event source group", func(t *testing.T) { _, err := New(Config{ StreamingConnector: testutils.NewMockStreamingConnector(t), @@ -475,7 +463,6 @@ func TestServiceNew(t *testing.T) { svc, err := New(Config{ StreamingConnector: testutils.NewMockStreamingConnector(t), MeterService: NewMockMeterService(), - ExportSubject: "test-subject", EventSourceGroup: "test-source", }) require.NoError(t, err) @@ -512,7 +499,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) { mockStreaming.AddSimpleEvent("test-meter", 30.0, now.Add(-2*time.Minute)) svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -576,7 +562,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) { } svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -621,7 +606,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) { mockStreaming := testutils.NewMockStreamingConnector(t) svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, @@ -668,7 +652,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) { mockStreaming := testutils.NewMockStreamingConnector(t) svc, err := New(Config{ - ExportSubject: "test-subject", EventSourceGroup: "test-source", StreamingConnector: mockStreaming, MeterService: mockMeterService, diff --git a/openmeter/meterexport/service/syntheticdata.go b/openmeter/meterexport/service/syntheticdata.go index 8e40fab8b..c91e7ae63 100644 --- a/openmeter/meterexport/service/syntheticdata.go +++ b/openmeter/meterexport/service/syntheticdata.go @@ -126,6 +126,7 @@ func (s *service) ExportSyntheticMeterData(ctx context.Context, config meterexpo To: config.Period.To, WindowSize: &config.ExportWindowSize, WindowTimeZone: time.UTC, + GroupBy: []string{"subject"}, }, }, meterRowCh, meterRowErrCh) }) @@ -146,7 +147,7 @@ func (s *service) createEventFromMeterRow(m meter.Meter, row meter.MeterQueryRow ID: ulid.Make().String(), Type: m.EventType, // We reuse the same type as the source meter Source: fmt.Sprintf("%s:%s/%s", s.EventSourceGroup, m.Namespace, m.ID), - Subject: s.ExportSubject, + Subject: lo.FromPtr(row.Subject), IngestedAt: clock.Now(), Time: row.WindowStart, CustomerID: nil,