From 9e07519f40aa70bedb5b32fc38d1682a6bcade3d Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 29 Jan 2026 21:39:52 -0600 Subject: [PATCH] rivertest: enable metadata assertions This enables basic assertions against metadata in `rivertest.RequireX` methods. The approach here isn't anything special: it allows a `map[string]any` to be specified and considers it a failure if all keys & values in it are not also in the metadata. This should make it possible to build workflow-related assertions and other metadata assertions much more easily. --- CHANGELOG.md | 4 ++ rivertest/rivertest.go | 106 ++++++++++++++++++++++++++++++++++++ rivertest/rivertest_test.go | 65 ++++++++++++++++++++++ 3 files changed, 175 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98e7d4a1..5a81d288 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add metadata assertions to rivertest. [PR #1137](https://github.com/riverqueue/river/pull/1137). + ## [0.30.2] - 2026-01-26 ### Fixed diff --git a/rivertest/rivertest.go b/rivertest/rivertest.go index 27a2f921..d8fa1721 100644 --- a/rivertest/rivertest.go +++ b/rivertest/rivertest.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "reflect" "slices" "strings" "testing" @@ -50,6 +51,13 @@ type RequireInsertedOpts struct { // No assertion is made if left the zero value. MaxAttempts int + // Metadata is a subset of job metadata to assert against. Only the keys and + // values provided are compared, and any extra metadata on the job is + // ignored. + // + // No assertion is made if left nil or empty. + Metadata map[string]any + // Priority is the expected priority for the inserted job. // // No assertion is made if left the zero value. @@ -501,6 +509,16 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts * } } + if len(expectedOpts.Metadata) > 0 { + metadataMatches, metadataFailures := compareMetadataSubset(t, jobRow.Metadata, expectedOpts.Metadata, requireNotInserted) + + if !metadataMatches && requireNotInserted { + return true + } + + failures = append(failures, metadataFailures...) + } + if expectedOpts.Priority != 0 { if jobRow.Priority == expectedOpts.Priority { if requireNotInserted { @@ -594,6 +612,94 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts * return false } +func compareMetadataSubset(t testingT, jobMetadataBytes []byte, expectedMetadata map[string]any, requireNotInserted bool) (bool, []string) { + t.Helper() + + jobMetadata := map[string]any{} + if len(jobMetadataBytes) > 0 { + if err := json.Unmarshal(jobMetadataBytes, &jobMetadata); err != nil { + failuref(t, "Internal failure: error unmarshaling job metadata: %s", err) + } + } + + keys := make([]string, 0, len(expectedMetadata)) + for key := range expectedMetadata { + keys = append(keys, key) + } + slices.Sort(keys) + + failures := make([]string, 0, len(keys)) + allMatch := true + for _, key := range keys { + expectedValue := expectedMetadata[key] + + actualValue, ok := jobMetadata[key] + if !ok { + allMatch = false + if requireNotInserted { + return false, nil + } + failures = append(failures, fmt.Sprintf("metadata missing key '%s'", key)) + continue + } + + if expectedValue == nil { + if actualValue == nil { + if requireNotInserted { + failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded null", key)) + } + } else { + allMatch = false + if requireNotInserted { + return false, nil + } + failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected null", key, formatMetadataValue(actualValue))) + } + continue + } + + normalizedExpected, err := normalizeMetadataValue(expectedValue) + if err != nil { + failuref(t, "Internal failure: error normalizing metadata for key '%s': %s", key, err) + } + + if reflect.DeepEqual(actualValue, normalizedExpected) { + if requireNotInserted { + failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded %s", key, formatMetadataValue(normalizedExpected))) + } + } else { + allMatch = false + if requireNotInserted { + return false, nil + } + failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected %s", key, formatMetadataValue(actualValue), formatMetadataValue(normalizedExpected))) + } + } + + return allMatch, failures +} + +func formatMetadataValue(value any) string { + encoded, err := json.Marshal(value) + if err != nil { + return fmt.Sprintf("%v", value) + } + return string(encoded) +} + +func normalizeMetadataValue(value any) (any, error) { + encoded, err := json.Marshal(value) + if err != nil { + return nil, err + } + + var normalized any + if err := json.Unmarshal(encoded, &normalized); err != nil { + return nil, err + } + return normalized, nil +} + // failuref takes a printf-style directive and is a shortcut for failing an // assertion. func failuref(t testingT, format string, a ...any) { diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index 7e7e1a06..02feb798 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -253,6 +253,41 @@ func TestRequireInsertedTx(t *testing.T) { mockT.LogOutput()) }) + t.Run("Metadata", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{ + Metadata: []byte(`{"key":"value","list":[1,2],"nested":{"enabled":true},"num":1}`), + }) + require.NoError(t, err) + + mockT := testutil.NewMockT(t) + opts := &RequireInsertedOpts{ + Metadata: map[string]any{ + "key": "value", + "nested": map[string]any{"enabled": true}, + "num": int64(1), + }, + } + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) + require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput()) + + mockT = testutil.NewMockT(t) + opts = &RequireInsertedOpts{ + Metadata: map[string]any{ + "key": "wrong", + "missing": "value", + }, + } + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) + require.True(t, mockT.Failed) + require.Equal(t, + failureString("Job with kind 'job2' metadata[key] \"value\" not equal to expected \"wrong\", metadata missing key 'missing'")+"\n", + mockT.LogOutput()) + }) + t.Run("Priority", func(t *testing.T) { t.Parallel() @@ -587,6 +622,36 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT.LogOutput()) }) + t.Run("Metadata", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{ + Metadata: []byte(`{"key":"value"}`), + }) + require.NoError(t, err) + + mockT := testutil.NewMockT(t) + opts := emptyOpts() + opts.Metadata = map[string]any{ + "key": "value", + } + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) + require.True(t, mockT.Failed) + require.Equal(t, + failureString("Job with kind 'job2' metadata[key] equal to excluded \"value\"")+"\n", + mockT.LogOutput()) + + mockT = testutil.NewMockT(t) + opts = emptyOpts() + opts.Metadata = map[string]any{ + "key": "other", + } + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) + require.False(t, mockT.Failed) + }) + t.Run("Priority", func(t *testing.T) { t.Parallel()