Skip to content

Conversation

@gtarpenning
Copy link
Member

@gtarpenning gtarpenning commented Dec 18, 2025

Description

WB-29945

Enables proper distributed table support for replicated ClickHouse deployments by automatically creating distributed tables on top of replicated local tables during migrations. This allows queries to be routed correctly across the cluster rather than hitting individual replicas, no changes needed on the query side.

Follows implementation explained in the docs here: https://clickhouse.com/docs/engines/table-engines/special/distributed

This PR:

  • Adds environment variable helpers for ClickHouse replication settings (WF_CLICKHOUSE_REPLICATED, WF_CLICKHOUSE_REPLICATED_PATH, WF_CLICKHOUSE_REPLICATED_CLUSTER, WF_CLICKHOUSE_USE_DISTRIBUTED_TABLES)
  • Passes replication configuration from environment to migrator (previously wasn't wired up, we use ./migrate_local but still we should do this)
  • Implements special handling for ALTER/CREATE/DROP operations (indexes only on local, regular columns on both)
  • Adds materialized view transformation using DROP/CREATE pattern with _local suffix routing
  • Skips unsupported operations (MATERIALIZE commands) in distributed mode
  • Refactors the migrator,
    - push methods that just generate sql out of class. Now, all class methods actually execute sql.
    - all SQL regex is in its own helper class
    - split Base, Replicated, Distributed handling into their own classes with hierarchy
  • add distributed properties to the clickhouse_batched class
  • Add comprehensive tests for distributed table creation and replicated mode behavior
  • Fixed potential bug where passing an explicit ReplicatedMergeTree engine type still gets "Replicated" prepended.

Testing

updates existing tests and adds new ones

Local testing:

> WF_CLICKHOUSE_REPLICATED_CLUSTER=cluster_2S_2R WF_CLICKHOUSE_REPLICATED=true WF_CLICKHOUSE_USE_DISTRIBUTED_TABLES=true make migrate_local
./migrate_local.sh
INFO:weave.trace_server.clickhouse_trace_server_migrator:ClickHouseTraceServerMigrator initialized with: replicated=True, use_distributed=True, replicated_cluster=cluster_2S_2R, replicated_path=/clickhouse/tables/{db}, management_db=db_management
INFO:weave.trace_server.clickhouse_trace_server_migrator:`default` migration status: {'db_name': 'default', 'curr_version': 0, 'partially_applied_version': None}
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migrations to apply: [(1, '001_init.up.sql'), (2, '002_add_deleted_at.up.sql'), (3, '003_feedback.up.sql'), (4, '004_add_display_name.up.sql'), (5, '005_add_cost.up.sql'), (6, '006_seed_costs.up.sql'), (7, '007_add_refs_to_feedback.up.sql'), (8, '008_stats_views.up.sql'), (9, '009_increase_call_timestamp_resolution.up.sql'), (10, '010_add_user_to_objects.up.sql'), (11, '011_add_file_bucket_storage.up.sql'), (12, '012_add_sortable_timestamp.up.sql'), (13, '013_add_wb_run_step.up.sql'), (14, '014_add_leaf_object_class.up.sql'), (15, '015_add_thread_id.up.sql'), (16, '016_add_thread_id_to_stats.up.sql'), (17, '017_add_turn_id.up.sql'), (18, '018_add_wb_run_id_idx.up.sql'), (19, '019_add_wb_run_step_end.up.sql'), (20, '020_add_otel_column.up.sql'), (21, '021_fix_otel_stats.up.sql'), (22, '022_calls_complete.up.sql')]
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 001_init.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 001_init.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 002_add_deleted_at.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 002_add_deleted_at.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 003_feedback.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 003_feedback.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 004_add_display_name.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 004_add_display_name.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 005_add_cost.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 005_add_cost.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 006_seed_costs.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 006_seed_costs.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 007_add_refs_to_feedback.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 007_add_refs_to_feedback.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 008_stats_views.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 008_stats_views.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 009_increase_call_timestamp_resolution.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 009_increase_call_timestamp_resolution.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 010_add_user_to_objects.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 010_add_user_to_objects.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 011_add_file_bucket_storage.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 011_add_file_bucket_storage.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 012_add_sortable_timestamp.up.sql to `default`
WARNING:weave.trace_server.clickhouse_trace_server_migrator:Skipping MATERIALIZE command (not supported in distributed mode): -- Materialize the column
ALTER TABLE calls_merged MATERIALIZE COLUMN sortable_datetime
WARNING:weave.trace_server.clickhouse_trace_server_migrator:Skipping MATERIALIZE command (not supported in distributed mode): -- Materialize the index, actually generating index marks for all the granules
ALTER TABLE calls_merged MATERIALIZE INDEX idx_sortable_datetime
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 012_add_sortable_timestamp.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 013_add_wb_run_step.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 013_add_wb_run_step.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 014_add_leaf_object_class.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 014_add_leaf_object_class.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 015_add_thread_id.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 015_add_thread_id.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 016_add_thread_id_to_stats.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 016_add_thread_id_to_stats.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 017_add_turn_id.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 017_add_turn_id.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 018_add_wb_run_id_idx.up.sql to `default`
WARNING:weave.trace_server.clickhouse_trace_server_migrator:Skipping MATERIALIZE command (not supported in distributed mode): -- Materialize the index, actually generating index marks for all the granules
ALTER TABLE calls_merged MATERIALIZE INDEX idx_wb_run_id
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 018_add_wb_run_id_idx.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 019_add_wb_run_step_end.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 019_add_wb_run_step_end.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 020_add_otel_column.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 020_add_otel_column.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 021_fix_otel_stats.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 021_fix_otel_stats.up.sql applied to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Applying migration 022_calls_complete.up.sql to `default`
INFO:weave.trace_server.clickhouse_trace_server_migrator:Migration 022_calls_complete.up.sql applied to `default`
INFO:weave.trace_server.costs.insert_costs:Loaded 2146 costs from json
INFO:weave.trace_server.costs.insert_costs:There are 2146 costs to insert, after filtering out existing costs
INFO:weave.trace_server.costs.insert_costs:Inserted 2146 costs

@gtarpenning gtarpenning marked this pull request as ready for review December 18, 2025 19:04
@gtarpenning gtarpenning requested a review from a team as a code owner December 18, 2025 19:04
@codecov
Copy link

codecov bot commented Dec 18, 2025

Codecov Report

❌ Patch coverage is 88.03987% with 36 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...e/trace_server/clickhouse_trace_server_migrator.py 88.88% 18 Missing and 13 partials ⚠️
weave/trace_server/environment.py 50.00% 4 Missing ⚠️
...ve/trace_server/clickhouse_trace_server_batched.py 90.90% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

Comment on lines +98 to +118
def wf_clickhouse_replicated() -> bool:
"""Whether to use replicated clickhouse tables."""
return os.environ.get("WF_CLICKHOUSE_REPLICATED", "false").lower() == "true"


def wf_clickhouse_replicated_path() -> str | None:
"""The path of the replicated clickhouse tables."""
return os.environ.get("WF_CLICKHOUSE_REPLICATED_PATH")


def wf_clickhouse_replicated_cluster() -> str | None:
"""The cluster of the replicated clickhouse tables."""
return os.environ.get("WF_CLICKHOUSE_REPLICATED_CLUSTER")


def wf_clickhouse_use_distributed_tables() -> bool:
"""Whether to use distributed tables on top of replicated tables."""
return (
os.environ.get("WF_CLICKHOUSE_USE_DISTRIBUTED_TABLES", "false").lower()
== "true"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can drop the wf_/WF_ prefixes at this point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we probably can... but... wouldn't you rather have everything look uniform instead at the cost of a few characters?

@wandbot-3000
Copy link

wandbot-3000 bot commented Dec 18, 2025

# Validate configuration
if self.use_distributed and not self.replicated:
raise MigrationError(
"Distributed tables can only be used with replicated tables. "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a Clickhouse constraint or something we want to enforce ourselves?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a sanity check for ourselves, it doesn't make sense to have "distributed" tables over local ones if they aren't replicated. I guess if you had a sharding system without replication, but that would be an antipattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replication is easier than sharding and provides similar benefits

@wandb wandb deleted a comment from github-actions bot Dec 18, 2025
…lete branch

Pull in distributed mode and migrator enhancements from griffin/calls-complete-step-6-with-migrate:

Migrator improvements:
- Move LOCAL_TABLE_SUFFIX constant to settings for reusability
- Add support for ALTER TABLE DELETE/UPDATE mutations in distributed mode
- Add DROP TABLE/VIEW support for distributed mode with proper cleanup
- Fix replica naming to use {shard}-{replica} for unique identification
- Add _is_local_only_operation() helper for better distributed handling

Server enhancements:
- Add use_distributed_mode property to check environment configuration
- Add clickhouse_cluster_name property for cluster operations
- Add _get_calls_complete_table_name() for proper table targeting

Settings updates:
- Add LOCAL_TABLE_SUFFIX constant for distributed table naming
- Add distributed_product_mode to query settings for cross-table operations
- Add batch update constants for future use

These changes lay the groundwork for distributed ClickHouse deployments
without introducing the calls_complete migration.
@github-actions
Copy link
Contributor

❌ Documentation Reference Check Failed

No documentation reference found in the PR description. Please add either:

This check is required for all PRs except those that start with "chore(weave)" or explicitly state "docs are not required". Please update your PR description and this check will run again automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants