diff --git a/service/sharddistributor/store/etcd/executorstore/common/compression.go b/service/sharddistributor/store/etcd/executorstore/common/compression.go index 102ab5c1ac2..9b46eec7d7b 100644 --- a/service/sharddistributor/store/etcd/executorstore/common/compression.go +++ b/service/sharddistributor/store/etcd/executorstore/common/compression.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "strings" "github.com/golang/snappy" ) @@ -14,6 +15,53 @@ var ( _snappyHeader = []byte{0xff, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'} ) +const ( + // CompressionSnappy indicates snappy compression should be applied + CompressionSnappy = "snappy" +) + +type compressionMode int + +const ( + compressionNone compressionMode = iota + compressionSnappy +) + +// RecordWriter handles serialization of data for etcd, applying compression when configured. +type RecordWriter struct { + mode compressionMode +} + +// NewRecordWriter constructs a RecordWriter based on the configured compression type. +func NewRecordWriter(compressionType string) (*RecordWriter, error) { + switch strings.ToLower(compressionType) { + case "", "none": + return &RecordWriter{mode: compressionNone}, nil + case CompressionSnappy: + return &RecordWriter{mode: compressionSnappy}, nil + default: + return nil, fmt.Errorf("unsupported compression type: %s", compressionType) + } +} + +// Write serializes data using the configured compression mode. +func (w *RecordWriter) Write(data []byte) ([]byte, error) { + if w.mode == compressionNone { + return data, nil + } + + var buf bytes.Buffer + writer := snappy.NewBufferedWriter(&buf) + + if _, err := writer.Write(data); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + // Decompress decodes snappy-compressed data // If the snappy header is present, it will successfully decompress it or return an error // If the snappy header is absent, it treats data as uncompressed and returns it as-is diff --git a/service/sharddistributor/store/etcd/executorstore/common/compression_test.go b/service/sharddistributor/store/etcd/executorstore/common/compression_test.go index ad5f19ceb85..729891fc456 100644 --- a/service/sharddistributor/store/etcd/executorstore/common/compression_test.go +++ b/service/sharddistributor/store/etcd/executorstore/common/compression_test.go @@ -1,15 +1,55 @@ package common import ( - "bytes" "encoding/json" "testing" - "github.com/golang/snappy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestRecordWriter(t *testing.T) { + original := []byte("test-data") + + t.Run("no compression when empty", func(t *testing.T) { + writer, err := NewRecordWriter("") + require.NoError(t, err) + + out, err := writer.Write(original) + require.NoError(t, err) + assert.Equal(t, original, out) + }) + + t.Run("no compression when none", func(t *testing.T) { + writer, err := NewRecordWriter("none") + require.NoError(t, err) + + out, err := writer.Write(original) + require.NoError(t, err) + assert.Equal(t, original, out) + }) + + t.Run("snappy compression", func(t *testing.T) { + writer, err := NewRecordWriter(CompressionSnappy) + require.NoError(t, err) + + out, err := writer.Write(original) + require.NoError(t, err) + require.NotNil(t, out) + assert.NotEqual(t, original, out) + + decompressed, err := Decompress(out) + require.NoError(t, err) + assert.Equal(t, original, decompressed) + }) + + t.Run("unsupported compression", func(t *testing.T) { + writer, err := NewRecordWriter("unsupported") + require.Error(t, err) + assert.Nil(t, writer) + }) +} + func TestDecompress(t *testing.T) { t.Run("Empty data", func(t *testing.T) { decompressed, err := Decompress([]byte{}) @@ -38,7 +78,10 @@ func TestDecompress(t *testing.T) { t.Run("Compressed data", func(t *testing.T) { original := []byte(`{"status":"DRAINING"}`) - compressed, err := compressData(original) + writer, err := NewRecordWriter(CompressionSnappy) + require.NoError(t, err) + + compressed, err := writer.Write(original) require.NoError(t, err) result, err := Decompress(compressed) @@ -74,7 +117,10 @@ func TestDecompressAndUnmarshal(t *testing.T) { Shards: []string{"shard3", "shard4"}, } originalJSON, _ := json.Marshal(original) - compressed, err := compressData(originalJSON) + writer, err := NewRecordWriter(CompressionSnappy) + require.NoError(t, err) + + compressed, err := writer.Write(originalJSON) require.NoError(t, err) var result testData @@ -93,16 +139,3 @@ func TestDecompressAndUnmarshal(t *testing.T) { assert.Contains(t, err.Error(), "unmarshal") }) } - -func compressData(data []byte) ([]byte, error) { - var buf bytes.Buffer - w := snappy.NewBufferedWriter(&buf) - - if _, err := w.Write(data); err != nil { - return nil, err - } - if err := w.Close(); err != nil { - return nil, err - } - return buf.Bytes(), nil -} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index a6fb4473f9f..4c4a2f7462f 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -32,11 +32,12 @@ var ( const deleteShardStatsBatchSize = 64 type executorStoreImpl struct { - client *clientv3.Client - prefix string - logger log.Logger - shardCache *shardcache.ShardToExecutorCache - timeSource clock.TimeSource + client *clientv3.Client + prefix string + logger log.Logger + shardCache *shardcache.ShardToExecutorCache + timeSource clock.TimeSource + recordWriter *common.RecordWriter } // shardStatisticsUpdate holds the staged statistics for a shard so we can write them @@ -66,6 +67,7 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) { Endpoints []string `yaml:"endpoints"` DialTimeout time.Duration `yaml:"dialTimeout"` Prefix string `yaml:"prefix"` + Compression string `yaml:"compression"` } if err := p.Cfg.Store.StorageParams.Decode(&etcdCfg); err != nil { @@ -90,12 +92,18 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) { timeSource = clock.NewRealTimeSource() } + recordWriter, err := common.NewRecordWriter(etcdCfg.Compression) + if err != nil { + return nil, fmt.Errorf("create record writer: %w", err) + } + store := &executorStoreImpl{ - client: etcdClient, - prefix: etcdCfg.Prefix, - logger: p.Logger, - shardCache: shardCache, - timeSource: timeSource, + client: etcdClient, + prefix: etcdCfg.Prefix, + logger: p.Logger, + shardCache: shardCache, + timeSource: timeSource, + recordWriter: recordWriter, } p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop)) @@ -121,19 +129,30 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec reportedShardsData, err := json.Marshal(request.ReportedShards) if err != nil { - return fmt.Errorf("marshal assinged shards: %w", err) + return fmt.Errorf("marshal reported shards: %w", err) } jsonState, err := json.Marshal(request.Status) if err != nil { - return fmt.Errorf("marshal assinged shards: %w", err) + return fmt.Errorf("marshal assinged state: %w", err) + } + + // Compress data before writing to etcd + compressedReportedShards, err := s.recordWriter.Write(reportedShardsData) + if err != nil { + return fmt.Errorf("compress reported shards: %w", err) + } + + compressedState, err := s.recordWriter.Write(jsonState) + if err != nil { + return fmt.Errorf("compress assigned state: %w", err) } // Build all operations including metadata ops := []clientv3.Op{ clientv3.OpPut(heartbeatKey, etcdtypes.FormatTime(request.LastHeartbeat)), - clientv3.OpPut(stateKey, string(jsonState)), - clientv3.OpPut(reportedShardsKey, string(reportedShardsData)), + clientv3.OpPut(stateKey, string(compressedState)), + clientv3.OpPut(reportedShardsKey, string(compressedReportedShards)), } for key, value := range request.Metadata { metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key) @@ -359,7 +378,13 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, if err != nil { return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err) } - ops = append(ops, clientv3.OpPut(executorStateKey, string(value))) + + compressedValue, err := s.recordWriter.Write(value) + if err != nil { + return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err) + } + ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue))) + comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision)) } @@ -492,15 +517,24 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, state.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY} } + // Compress new state value newStateValue, err := json.Marshal(state) if err != nil { return fmt.Errorf("marshal new assigned state: %w", err) } + compressedStateValue, err := s.recordWriter.Write(newStateValue) + if err != nil { + return fmt.Errorf("compress new assigned state: %w", err) + } newStatsValue, err := json.Marshal(shardStats) if err != nil { return fmt.Errorf("marshal new shard statistics: %w", err) } + compressedStatsValue, err := s.recordWriter.Write(newStatsValue) + if err != nil { + return fmt.Errorf("compress new shard statistics: %w", err) + } var comparisons []clientv3.Cmp @@ -529,8 +563,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, txnResp, err := s.client.Txn(ctx). If(comparisons...). Then( - clientv3.OpPut(assignedState, string(newStateValue)), - clientv3.OpPut(shardStatsKey, string(newStatsValue)), + clientv3.OpPut(assignedState, string(compressedStateValue)), + clientv3.OpPut(shardStatsKey, string(compressedStatsValue)), ). Commit() diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index 5e5f1688d7b..a298974ea1d 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -3,13 +3,16 @@ package executorstore import ( "context" "encoding/json" + "strconv" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/fx/fxtest" + "gopkg.in/yaml.v2" + "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/store" @@ -88,6 +91,63 @@ func TestRecordHeartbeat(t *testing.T) { assert.Equal(t, "value-2", string(resp.Kvs[0].Value)) } +func TestRecordHeartbeat_NoCompression(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + + var etcdCfg struct { + Endpoints []string `yaml:"endpoints"` + DialTimeout time.Duration `yaml:"dialTimeout"` + Prefix string `yaml:"prefix"` + Compression string `yaml:"compression"` + } + require.NoError(t, tc.LeaderCfg.Store.StorageParams.Decode(&etcdCfg)) + etcdCfg.Compression = "none" + + encodedCfg, err := yaml.Marshal(etcdCfg) + require.NoError(t, err) + + var yamlNode *config.YamlNode + require.NoError(t, yaml.Unmarshal(encodedCfg, &yamlNode)) + tc.LeaderCfg.Store.StorageParams = yamlNode + tc.LeaderCfg.LeaderStore.StorageParams = yamlNode + tc.Compression = "none" + + executorStore := createStore(t, tc) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + executorID := "executor-no-compression" + req := store.HeartbeatState{ + LastHeartbeat: time.Now().UTC(), + Status: types.ExecutorStatusACTIVE, + ReportedShards: map[string]*types.ShardStatusReport{ + "shard-no-compression": {Status: types.ShardStatusREADY}, + }, + } + + require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req)) + + stateKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorStatusKey) + require.NoError(t, err) + reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorReportedShardsKey) + require.NoError(t, err) + + stateResp, err := tc.Client.Get(ctx, stateKey) + require.NoError(t, err) + require.Equal(t, int64(1), stateResp.Count) + statusJSON, err := json.Marshal(req.Status) + require.NoError(t, err) + assert.Equal(t, string(statusJSON), string(stateResp.Kvs[0].Value)) + + reportedResp, err := tc.Client.Get(ctx, reportedShardsKey) + require.NoError(t, err) + require.Equal(t, int64(1), reportedResp.Count) + reportedJSON, err := json.Marshal(req.ReportedShards) + require.NoError(t, err) + assert.Equal(t, string(reportedJSON), string(reportedResp.Kvs[0].Value)) +} + func TestGetHeartbeat(t *testing.T) { tc := testhelper.SetupStoreTestCluster(t) executorStore := createStore(t, tc) @@ -369,7 +429,12 @@ func TestSubscribe(t *testing.T) { // Now update the reported shards, which IS a significant change reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards") - _, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`) + require.NoError(t, err) + writer, err := common.NewRecordWriter(tc.Compression) + require.NoError(t, err) + compressedShards, err := writer.Write([]byte(`{"shard-1":{"status":"running"}}`)) + require.NoError(t, err) + _, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards)) require.NoError(t, err) select { @@ -589,12 +654,11 @@ func TestDeleteShardStatsDeletesLargeBatches(t *testing.T) { shardID := "stale-stats-" + strconv.Itoa(i) shardIDs = append(shardIDs, shardID) - statsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) - require.NoError(t, err) + statsKey := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) stats := store.ShardStatistics{ SmoothedLoad: float64(i), - LastUpdateTime: int64(i), - LastMoveTime: int64(i), + LastUpdateTime: time.Unix(int64(i), 0).UTC(), + LastMoveTime: time.Unix(int64(i), 0).UTC(), } payload, err := json.Marshal(stats) require.NoError(t, err) diff --git a/service/sharddistributor/store/etcd/testhelper/testhelper.go b/service/sharddistributor/store/etcd/testhelper/testhelper.go index 2334ea453e1..25ba8144c9a 100644 --- a/service/sharddistributor/store/etcd/testhelper/testhelper.go +++ b/service/sharddistributor/store/etcd/testhelper/testhelper.go @@ -20,10 +20,11 @@ import ( ) type StoreTestCluster struct { - EtcdPrefix string - Namespace string - LeaderCfg shardDistributorCfg.ShardDistribution - Client *clientv3.Client + EtcdPrefix string + Namespace string + LeaderCfg shardDistributorCfg.ShardDistribution + Client *clientv3.Client + Compression string } func SetupStoreTestCluster(t *testing.T) *StoreTestCluster { @@ -45,6 +46,7 @@ func SetupStoreTestCluster(t *testing.T) *StoreTestCluster { "dialTimeout": "5s", "prefix": etcdPrefix, "electionTTL": "5s", // Needed for leader config part + "compression": "snappy", } yamlCfg, err := yaml.Marshal(etcdConfigRaw)