Skip to content

Conversation

@alexstoick
Copy link
Contributor

This uses a new helper in internal.FetchConfigFromDB to fetch a fully hydrated protos.FlowConnectionConfigs. When passing this config to Temporal we strip the tableMappings array element which can cause it to go over the 2MB limit.

This contains a subset of the changes which were originally proposed in: #3407

@ilidemi
Copy link
Contributor

ilidemi commented Oct 11, 2025

Some ideas on how to tighten this up:

  • Remove the field from FlowConnectionConfigs (mark as reserved) to rip off the band-aid and have the type system working with us. For table additions pass the mappings as a separate arg.
  • Add a table_mappings table to catalog, write there on flow creation in handler.go, add a migration after pause like in cdc_flow.go lines 375-390 from Remove TableNameSchemaMapping from temporal state #2090 which pulled off a similar thing. Instead of fetching config from DB, fetch table mappings from DB.
  • Preserve replayability, which can be done by adding aversion bigint field that increments within a flow, referencing the value from FlowConnectionConfigs and making the mappings table be append-only.

Would be nice to consider how to not to trade off too much observability that Temporal is providing - right now for workflows with 1-10 tables it's convenient to see it right there in the execution. PeerDB UI and raw catalog access is not easily available in ClickPipes environment. Maybe log up to 100 tables (along with count and version) every time they're getting fetched?

@alexstoick
Copy link
Contributor Author

@ilidemi - Thanks for the feedback!

I hesitated in removing the field - as it does feel like a nuclear option - and I was worried about the migration path for running envs. I'll go ahead and implement your suggestion for the new table & taking a similar approach to #2090

In my previous PR: #3407 I was able to fully remove the options argument being passed which also reduced the size of the blob passed around - what are your thoughts on that? The options didn't serve any purpose once I was fetching the TableMappings and SrcTableIdNameMapping from the DB.


For adding/removing table mappings - I think we can either do:

  • a different activity
  • pass as an additional param

My only concern with changing the signature of this is again dealing with running systems and queued jobs - not sure how well Temporal behaves when you change the signature of the job!

Copy link
Contributor

@ilidemi ilidemi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options argument - moving SrcTableIdNameMapping to DB too makes sense to me, and use a version instead (or -1/nil). Rest of fields change with config/state separation, and although config is not fully immutable, merging the two would require more thinking (if it's a good idea) and would be too much scope for this PR. If it's easy enough and useful to do, we can do it separately.

Changing activity signatures - the only fully supported upgrade route for PeerDB is to pause CDC mirrors (and wait for setup/snapshot/drop to complete), then upgrade, then resume. It is nice when a surgical change is possible for running snapshots but we're not shooting for that even with smaller changes. We can just treat them as functions (besides the codepath from the start of CDCFlowWorkflow to the pause loop) and not worry about the rest. Adding an argument is good.

Comment on lines 308 to 327
// TODO: thought - maybe we need to pass additionalTables and then in the
// CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion`
// to the new workflow?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing up to ~10K additional tables at a time seems like a generous restriction. Although now it does make sense to have API explicitly turn down config update requests that Temporal won't take

ilidemi added a commit that referenced this pull request Oct 16, 2025
In #3589 we want to stop using a field in a way that's not error-prone
in the rest of the code (so, removing) but also doesn't break API
backcompat (so, not removing).

Trying to split FlowConnectionConfigs into two contracts that are made
sure to be in sync with a little codegen. Generated code has small TODOs
that would be removed by #3589 if we go this route.

Validated that renaming the type of a workflow arg works well with
history replay, only the payload is important.
@alexstoick alexstoick force-pushed the feat/move-cdc-workflow-to-activity branch from 6d045b1 to ae6f821 Compare October 28, 2025 19:11
@alexstoick alexstoick temporarily deployed to external-contributor October 28, 2025 19:11 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 28, 2025 19:11 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 28, 2025 19:11 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 28, 2025 19:11 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 30, 2025 10:03 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 30, 2025 10:03 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 30, 2025 10:03 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor October 30, 2025 10:03 — with GitHub Actions Inactive
This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully
hydrated `protos.FlowConnectionConfigs`. When passing this config to
Temporal we strip the `tableMappings` array element which can cause it
to go over the 2MB limit.

This contains a subset of the changes which were originally proposed in:
PeerDB-io#3407

Tenanative first go at removing `tableMappings` & storing in DB
@alexstoick alexstoick force-pushed the feat/move-cdc-workflow-to-activity branch from d4c3aeb to c91fd67 Compare December 1, 2025 16:31
@alexstoick alexstoick temporarily deployed to external-contributor December 1, 2025 16:31 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor December 1, 2025 16:31 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor December 1, 2025 16:31 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor December 1, 2025 16:31 — with GitHub Actions Inactive
@alexstoick alexstoick temporarily deployed to external-contributor December 8, 2025 17:14 — with GitHub Actions Inactive
@alexstoick alexstoick had a problem deploying to external-contributor December 8, 2025 17:14 — with GitHub Actions Failure
@alexstoick alexstoick had a problem deploying to external-contributor December 8, 2025 17:14 — with GitHub Actions Failure
@alexstoick alexstoick had a problem deploying to external-contributor December 8, 2025 17:14 — with GitHub Actions Failure
@codecov
Copy link

codecov bot commented Dec 22, 2025

❌ 8 Tests Failed:

Tests completed Failed Passed Skipped
1591 8 1583 152
View the top 3 failed test(s) by shortest run time
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH
Stack Traces | 0.01s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH (0.01s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH_Cluster
Stack Traces | 0.01s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH_Cluster
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH_Cluster
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH_Cluster
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH_Cluster (0.01s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuitePG_CH
Stack Traces | 0.01s run time
=== RUN   TestPeerFlowE2ETestSuitePG_CH
=== PAUSE TestPeerFlowE2ETestSuitePG_CH
=== CONT  TestPeerFlowE2ETestSuitePG_CH
--- FAIL: TestPeerFlowE2ETestSuitePG_CH (0.01s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuitePG_CH_Cluster
Stack Traces | 0.01s run time
=== RUN   TestPeerFlowE2ETestSuitePG_CH_Cluster
=== PAUSE TestPeerFlowE2ETestSuitePG_CH_Cluster
=== CONT  TestPeerFlowE2ETestSuitePG_CH_Cluster
--- FAIL: TestPeerFlowE2ETestSuitePG_CH_Cluster (0.01s)
2025/12/22 23:41:49 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2025/12/22 23:41:49 INFO Received AWS credentials from peer for connector: clickhouse x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH/Test_InitialLoadOnly_No_Primary_Key
Stack Traces | 31.1s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH/Test_InitialLoadOnly_No_Primary_Key
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH/Test_InitialLoadOnly_No_Primary_Key
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH/Test_InitialLoadOnly_No_Primary_Key
2025/12/22 23:42:39 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_1iha3flt.test_coercion
2025/12/22 23:42:39 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
    clickhouse_test.go:2036: UNEXPECTED STATUS TIMEOUT STATUS_SETUP
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH/Test_InitialLoadOnly_No_Primary_Key (31.07s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuitePG_CH/Test_InitialLoadOnly_No_Primary_Key
Stack Traces | 31.1s run time
=== RUN   TestPeerFlowE2ETestSuitePG_CH/Test_InitialLoadOnly_No_Primary_Key
=== PAUSE TestPeerFlowE2ETestSuitePG_CH/Test_InitialLoadOnly_No_Primary_Key
=== CONT  TestPeerFlowE2ETestSuitePG_CH/Test_InitialLoadOnly_No_Primary_Key
2025/12/22 23:43:10 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'"
2025/12/22 23:43:10 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'"
2025/12/22 23:43:10 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_577951858920585429 CURSOR FOR SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'" args=[]
2025/12/22 23:43:10 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'" channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_577951858920585429
2025/12/22 23:43:10 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_577951858920585429 records=0 bytes=0 channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'" rows=0 bytes=0 channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2025/12/22 23:43:10 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT sync_batch_id FROM metadata_last_sync_state WHERE job_name='test_normalize_metadata_with_retry_pgch_bt1vxbth'" rows=0 bytes=0 channelLen=0
2025/12/22 23:43:10 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2025/12/22 23:43:10 INFO Received AWS credentials from peer for connector: clickhouse x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2025/12/22 23:43:10 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id"
2025/12/22 23:43:10 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id"
2025/12/22 23:43:10 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_27037176320068089 CURSOR FOR SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id" args=[]
2025/12/22 23:43:10 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id" channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_27037176320068089
2025/12/22 23:43:10 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_27037176320068089 records=2 bytes=15 channelLen=1
2025/12/22 23:43:10 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id" rows=2 bytes=15 channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_27037176320068089
2025/12/22 23:43:10 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_27037176320068089 records=0 bytes=0 channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id" rows=0 bytes=0 channelLen=0
2025/12/22 23:43:10 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2025/12/22 23:43:10 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,ky,val,n,t FROM e2e_test_pgch_73fttmg4.\"test_nullable_mirror\" ORDER BY id" rows=2 bytes=15 channelLen=0
    clickhouse_test.go:2036: UNEXPECTED STATUS TIMEOUT STATUS_SETUP
    clickhouse.go:114: begin tearing down postgres schema pgch_7qkizzpr
--- FAIL: TestPeerFlowE2ETestSuitePG_CH/Test_InitialLoadOnly_No_Primary_Key (31.11s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
Stack Traces | 31.1s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
2025/12/22 23:38:32 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mychcl_xjf7uj1i.test_skip_snapshot
    clickhouse_test.go:2036: UNEXPECTED STATUS TIMEOUT STATUS_SETUP
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key (31.15s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuitePG_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
Stack Traces | 31.2s run time
=== RUN   TestPeerFlowE2ETestSuitePG_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
=== PAUSE TestPeerFlowE2ETestSuitePG_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
=== CONT  TestPeerFlowE2ETestSuitePG_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key
2025/12/22 23:40:08 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
    clickhouse_test.go:2036: UNEXPECTED STATUS TIMEOUT STATUS_SETUP
    clickhouse.go:114: begin tearing down postgres schema pgchcl_y5fugt28
--- FAIL: TestPeerFlowE2ETestSuitePG_CH_Cluster/Test_InitialLoadOnly_No_Primary_Key (31.17s)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants