-
Notifications
You must be signed in to change notification settings - Fork 143
fix(meterexport): groupby subject #3689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis PR refactors the meterexport service to eliminate the required ExportSubject configuration field and instead derive subject information directly from meter row data. Validation rules are updated to allow subject-based grouping in funnel queries while subjects in exported events now use actual row data instead of a static config value. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
openmeter/meterexport/service.go (1)
23-33: Doc note looks stale now thatGroupBy: ["subject"]is supported (at least internally).
Line 30 still says “GroupBy values are not yet supported”, but this PR adds subject grouping—worth updating to avoid confusing API consumers.openmeter/meterexport/service/syntheticdata.go (2)
58-118: Bug: consumer goroutine can spin forever oncemeterRowErrChis closed.
WhenmeterRowErrChis closed,case err, ok := <-meterRowErrChwill keep firing withok=false, and the loop never exits / never hits themeterRowChclosed path.g.Go(func() error { + // Avoid select spinning on closed channels + meterRowErrChLocal := meterRowErrCh + meterRowChLocal := meterRowCh for { select { case <-ctx.Done(): sendCtxErr() return nil - case err, ok := <-meterRowErrCh: + case err, ok := <-meterRowErrChLocal: + if !ok { + meterRowErrChLocal = nil + continue + } // Filter out context errors as they're handled via sendCtxErr - if ok && err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { errCh <- err } - case row, ok := <-meterRowCh: + case row, ok := <-meterRowChLocal: if !ok { // Before returning, check if context was canceled // This ensures we always report context cancellation to the caller sendCtxErr() return nil }
120-132: Validate thatrow.Subjectis non-empty before creating events.Since OpenMeter's business logic requires CloudEvents
subjectto be present, usinglo.FromPtr(row.Subject)can silently produce an empty string if the pointer is nil. This will likely cause ingestion to reject the synthetic events. Add an explicit check at the start ofcreateEventFromMeterRowto fail fast:func (s *service) createEventFromMeterRow(m meter.Meter, row meter.MeterQueryRow) (streaming.RawEvent, error) { + if row.Subject == nil || *row.Subject == "" { + return streaming.RawEvent{}, fmt.Errorf("missing subject in meter row") + } // For SUM and COUNT type source meters, all event rows can be represented as SUM meter events baseEvent := streaming.RawEvent{ Namespace: m.Namespace, ID: ulid.Make().String(), Type: m.EventType, // We reuse the same type as the source meter Source: fmt.Sprintf("%s:%s/%s", s.EventSourceGroup, m.Namespace, m.ID), - Subject: lo.FromPtr(row.Subject), + Subject: *row.Subject, IngestedAt: clock.Now(), Time: row.WindowStart, CustomerID: nil, }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
openmeter/meterexport/service.go(1 hunks)openmeter/meterexport/service/funnel.go(1 hunks)openmeter/meterexport/service/service.go(0 hunks)openmeter/meterexport/service/service_test.go(0 hunks)openmeter/meterexport/service/syntheticdata.go(2 hunks)
💤 Files with no reviewable changes (2)
- openmeter/meterexport/service/service.go
- openmeter/meterexport/service/service_test.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
**/*.go: In general when reviewing the Golang code make readability and maintainability a priority, even potentially suggest restructuring the code to improve them.Performance should be a priority in critical code paths. Anything related to event ingestion, message processing, database operations (regardless of database) should be vetted for potential performance bottlenecks.
Files:
openmeter/meterexport/service/funnel.goopenmeter/meterexport/service.goopenmeter/meterexport/service/syntheticdata.go
🧠 Learnings (2)
📓 Common learnings
Learnt from: chrisgacsal
Repo: openmeterio/openmeter PR: 3486
File: openmeter/ingest/kafkaingest/serializer/serializer.go:105-107
Timestamp: 2025-10-09T13:59:12.012Z
Learning: In OpenMeter, the CloudEvents `subject` field is mandatory for the application's business logic, even though it's optional in the CloudEvents specification. The `ValidateKafkaPayloadToCloudEvent` function in `openmeter/ingest/kafkaingest/serializer/serializer.go` intentionally enforces this requirement.
📚 Learning: 2025-10-09T13:59:12.012Z
Learnt from: chrisgacsal
Repo: openmeterio/openmeter PR: 3486
File: openmeter/ingest/kafkaingest/serializer/serializer.go:105-107
Timestamp: 2025-10-09T13:59:12.012Z
Learning: In OpenMeter, the CloudEvents `subject` field is mandatory for the application's business logic, even though it's optional in the CloudEvents specification. The `ValidateKafkaPayloadToCloudEvent` function in `openmeter/ingest/kafkaingest/serializer/serializer.go` intentionally enforces this requirement.
Applied to files:
openmeter/meterexport/service/funnel.goopenmeter/meterexport/service/syntheticdata.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Artifacts / Container image
- GitHub Check: Code Generators
- GitHub Check: Lint
- GitHub Check: Migration Checks
- GitHub Check: Build
- GitHub Check: Test
- GitHub Check: Repository Scan
- GitHub Check: Analyze (go)
| // GroupBy subject is allowed (used internally for per-subject export) | ||
| for _, g := range p.queryParams.GroupBy { | ||
| if g != "subject" { | ||
| errs = append(errs, errors.New("group by is only supported for subject")) | ||
| break | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupBy is validated but never sent to QueryMeter (so grouping likely does nothing).
Nice validation tweak, but the actual query drops params.queryParams.GroupBy.
queryParams := streaming.QueryParams{
From: &queryFrom,
To: &queryTo,
WindowSize: params.queryParams.WindowSize,
WindowTimeZone: params.queryParams.WindowTimeZone,
+ GroupBy: params.queryParams.GroupBy,
}Also applies to: 99-107
🤖 Prompt for AI Agents
In openmeter/meterexport/service/funnel.go around lines 56-62 and again around
99-107, GroupBy is validated but never forwarded to QueryMeter, so grouping has
no effect; update the code paths that build the query params passed to
QueryMeter to include params.queryParams.GroupBy (or the equivalent GroupBy
field) when calling QueryMeter, ensuring the validated GroupBy slice is copied
or referenced into the request object/params sent to QueryMeter in both
locations so grouping is applied at query time.
Overview
We obviously need to be able to group by subject
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.