Skip to content

Commit d384df4

Browse files
feat: [shard-distributor]Compress data before writing to ETCD (#7412)
<!-- Describe what has changed in this PR --> **What changed?** - Update Shard-distributor Store config to accept Compression field, to set the compression type. Defaults to none - Add Compress() method to encode data with snappy compression - Compress executor state, reported shards and assigned state before writing to etcd - Extend unit-tests with compression logic check <!-- Tell your future self why have you made these changes --> **Why?** To reducing storage footprint <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit-tests and local testing <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** N/A <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Gaziza Yestemirova <[email protected]>
1 parent 125b038 commit d384df4

File tree

5 files changed

+224
-43
lines changed

5 files changed

+224
-43
lines changed

service/sharddistributor/store/etcd/executorstore/common/compression.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"io"
8+
"strings"
89

910
"github.com/golang/snappy"
1011
)
@@ -14,6 +15,53 @@ var (
1415
_snappyHeader = []byte{0xff, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'}
1516
)
1617

18+
const (
19+
// CompressionSnappy indicates snappy compression should be applied
20+
CompressionSnappy = "snappy"
21+
)
22+
23+
type compressionMode int
24+
25+
const (
26+
compressionNone compressionMode = iota
27+
compressionSnappy
28+
)
29+
30+
// RecordWriter handles serialization of data for etcd, applying compression when configured.
31+
type RecordWriter struct {
32+
mode compressionMode
33+
}
34+
35+
// NewRecordWriter constructs a RecordWriter based on the configured compression type.
36+
func NewRecordWriter(compressionType string) (*RecordWriter, error) {
37+
switch strings.ToLower(compressionType) {
38+
case "", "none":
39+
return &RecordWriter{mode: compressionNone}, nil
40+
case CompressionSnappy:
41+
return &RecordWriter{mode: compressionSnappy}, nil
42+
default:
43+
return nil, fmt.Errorf("unsupported compression type: %s", compressionType)
44+
}
45+
}
46+
47+
// Write serializes data using the configured compression mode.
48+
func (w *RecordWriter) Write(data []byte) ([]byte, error) {
49+
if w.mode == compressionNone {
50+
return data, nil
51+
}
52+
53+
var buf bytes.Buffer
54+
writer := snappy.NewBufferedWriter(&buf)
55+
56+
if _, err := writer.Write(data); err != nil {
57+
return nil, err
58+
}
59+
if err := writer.Close(); err != nil {
60+
return nil, err
61+
}
62+
return buf.Bytes(), nil
63+
}
64+
1765
// Decompress decodes snappy-compressed data
1866
// If the snappy header is present, it will successfully decompress it or return an error
1967
// If the snappy header is absent, it treats data as uncompressed and returns it as-is

service/sharddistributor/store/etcd/executorstore/common/compression_test.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,55 @@
11
package common
22

33
import (
4-
"bytes"
54
"encoding/json"
65
"testing"
76

8-
"github.com/golang/snappy"
97
"github.com/stretchr/testify/assert"
108
"github.com/stretchr/testify/require"
119
)
1210

11+
func TestRecordWriter(t *testing.T) {
12+
original := []byte("test-data")
13+
14+
t.Run("no compression when empty", func(t *testing.T) {
15+
writer, err := NewRecordWriter("")
16+
require.NoError(t, err)
17+
18+
out, err := writer.Write(original)
19+
require.NoError(t, err)
20+
assert.Equal(t, original, out)
21+
})
22+
23+
t.Run("no compression when none", func(t *testing.T) {
24+
writer, err := NewRecordWriter("none")
25+
require.NoError(t, err)
26+
27+
out, err := writer.Write(original)
28+
require.NoError(t, err)
29+
assert.Equal(t, original, out)
30+
})
31+
32+
t.Run("snappy compression", func(t *testing.T) {
33+
writer, err := NewRecordWriter(CompressionSnappy)
34+
require.NoError(t, err)
35+
36+
out, err := writer.Write(original)
37+
require.NoError(t, err)
38+
require.NotNil(t, out)
39+
assert.NotEqual(t, original, out)
40+
41+
decompressed, err := Decompress(out)
42+
require.NoError(t, err)
43+
assert.Equal(t, original, decompressed)
44+
})
45+
46+
t.Run("unsupported compression", func(t *testing.T) {
47+
writer, err := NewRecordWriter("unsupported")
48+
require.Error(t, err)
49+
assert.Nil(t, writer)
50+
})
51+
}
52+
1353
func TestDecompress(t *testing.T) {
1454
t.Run("Empty data", func(t *testing.T) {
1555
decompressed, err := Decompress([]byte{})
@@ -38,7 +78,10 @@ func TestDecompress(t *testing.T) {
3878

3979
t.Run("Compressed data", func(t *testing.T) {
4080
original := []byte(`{"status":"DRAINING"}`)
41-
compressed, err := compressData(original)
81+
writer, err := NewRecordWriter(CompressionSnappy)
82+
require.NoError(t, err)
83+
84+
compressed, err := writer.Write(original)
4285
require.NoError(t, err)
4386

4487
result, err := Decompress(compressed)
@@ -74,7 +117,10 @@ func TestDecompressAndUnmarshal(t *testing.T) {
74117
Shards: []string{"shard3", "shard4"},
75118
}
76119
originalJSON, _ := json.Marshal(original)
77-
compressed, err := compressData(originalJSON)
120+
writer, err := NewRecordWriter(CompressionSnappy)
121+
require.NoError(t, err)
122+
123+
compressed, err := writer.Write(originalJSON)
78124
require.NoError(t, err)
79125

80126
var result testData
@@ -93,16 +139,3 @@ func TestDecompressAndUnmarshal(t *testing.T) {
93139
assert.Contains(t, err.Error(), "unmarshal")
94140
})
95141
}
96-
97-
func compressData(data []byte) ([]byte, error) {
98-
var buf bytes.Buffer
99-
w := snappy.NewBufferedWriter(&buf)
100-
101-
if _, err := w.Write(data); err != nil {
102-
return nil, err
103-
}
104-
if err := w.Close(); err != nil {
105-
return nil, err
106-
}
107-
return buf.Bytes(), nil
108-
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ var (
3232
const deleteShardStatsBatchSize = 64
3333

3434
type executorStoreImpl struct {
35-
client *clientv3.Client
36-
prefix string
37-
logger log.Logger
38-
shardCache *shardcache.ShardToExecutorCache
39-
timeSource clock.TimeSource
35+
client *clientv3.Client
36+
prefix string
37+
logger log.Logger
38+
shardCache *shardcache.ShardToExecutorCache
39+
timeSource clock.TimeSource
40+
recordWriter *common.RecordWriter
4041
}
4142

4243
// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
@@ -66,6 +67,7 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
6667
Endpoints []string `yaml:"endpoints"`
6768
DialTimeout time.Duration `yaml:"dialTimeout"`
6869
Prefix string `yaml:"prefix"`
70+
Compression string `yaml:"compression"`
6971
}
7072

7173
if err := p.Cfg.Store.StorageParams.Decode(&etcdCfg); err != nil {
@@ -90,12 +92,18 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
9092
timeSource = clock.NewRealTimeSource()
9193
}
9294

95+
recordWriter, err := common.NewRecordWriter(etcdCfg.Compression)
96+
if err != nil {
97+
return nil, fmt.Errorf("create record writer: %w", err)
98+
}
99+
93100
store := &executorStoreImpl{
94-
client: etcdClient,
95-
prefix: etcdCfg.Prefix,
96-
logger: p.Logger,
97-
shardCache: shardCache,
98-
timeSource: timeSource,
101+
client: etcdClient,
102+
prefix: etcdCfg.Prefix,
103+
logger: p.Logger,
104+
shardCache: shardCache,
105+
timeSource: timeSource,
106+
recordWriter: recordWriter,
99107
}
100108

101109
p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
@@ -121,19 +129,30 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
121129

122130
reportedShardsData, err := json.Marshal(request.ReportedShards)
123131
if err != nil {
124-
return fmt.Errorf("marshal assinged shards: %w", err)
132+
return fmt.Errorf("marshal reported shards: %w", err)
125133
}
126134

127135
jsonState, err := json.Marshal(request.Status)
128136
if err != nil {
129-
return fmt.Errorf("marshal assinged shards: %w", err)
137+
return fmt.Errorf("marshal assinged state: %w", err)
138+
}
139+
140+
// Compress data before writing to etcd
141+
compressedReportedShards, err := s.recordWriter.Write(reportedShardsData)
142+
if err != nil {
143+
return fmt.Errorf("compress reported shards: %w", err)
144+
}
145+
146+
compressedState, err := s.recordWriter.Write(jsonState)
147+
if err != nil {
148+
return fmt.Errorf("compress assigned state: %w", err)
130149
}
131150

132151
// Build all operations including metadata
133152
ops := []clientv3.Op{
134153
clientv3.OpPut(heartbeatKey, etcdtypes.FormatTime(request.LastHeartbeat)),
135-
clientv3.OpPut(stateKey, string(jsonState)),
136-
clientv3.OpPut(reportedShardsKey, string(reportedShardsData)),
154+
clientv3.OpPut(stateKey, string(compressedState)),
155+
clientv3.OpPut(reportedShardsKey, string(compressedReportedShards)),
137156
}
138157
for key, value := range request.Metadata {
139158
metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key)
@@ -359,7 +378,13 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
359378
if err != nil {
360379
return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err)
361380
}
362-
ops = append(ops, clientv3.OpPut(executorStateKey, string(value)))
381+
382+
compressedValue, err := s.recordWriter.Write(value)
383+
if err != nil {
384+
return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err)
385+
}
386+
ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue)))
387+
363388
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision))
364389
}
365390

@@ -492,15 +517,24 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
492517
state.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
493518
}
494519

520+
// Compress new state value
495521
newStateValue, err := json.Marshal(state)
496522
if err != nil {
497523
return fmt.Errorf("marshal new assigned state: %w", err)
498524
}
525+
compressedStateValue, err := s.recordWriter.Write(newStateValue)
526+
if err != nil {
527+
return fmt.Errorf("compress new assigned state: %w", err)
528+
}
499529

500530
newStatsValue, err := json.Marshal(shardStats)
501531
if err != nil {
502532
return fmt.Errorf("marshal new shard statistics: %w", err)
503533
}
534+
compressedStatsValue, err := s.recordWriter.Write(newStatsValue)
535+
if err != nil {
536+
return fmt.Errorf("compress new shard statistics: %w", err)
537+
}
504538

505539
var comparisons []clientv3.Cmp
506540

@@ -529,8 +563,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
529563
txnResp, err := s.client.Txn(ctx).
530564
If(comparisons...).
531565
Then(
532-
clientv3.OpPut(assignedState, string(newStateValue)),
533-
clientv3.OpPut(shardStatsKey, string(newStatsValue)),
566+
clientv3.OpPut(assignedState, string(compressedStateValue)),
567+
clientv3.OpPut(shardStatsKey, string(compressedStatsValue)),
534568
).
535569
Commit()
536570

0 commit comments

Comments
 (0)