Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/sharddistributor/handler/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestHeartbeat(t *testing.T) {
Status: types.ExecutorStatusACTIVE,
}
previousHeartbeat := store.HeartbeatState{
LastHeartbeat: now.Unix(),
LastHeartbeat: now.UTC(),
Status: types.ExecutorStatusACTIVE,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/golang/snappy"
)
Expand All @@ -14,6 +15,53 @@ var (
_snappyHeader = []byte{0xff, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'}
)

const (
// CompressionSnappy indicates snappy compression should be applied
CompressionSnappy = "snappy"
)

type compressionMode int

const (
compressionNone compressionMode = iota
compressionSnappy
)

// RecordWriter handles serialization of data for etcd, applying compression when configured.
type RecordWriter struct {
mode compressionMode
}

// NewRecordWriter constructs a RecordWriter based on the configured compression type.
func NewRecordWriter(compressionType string) (*RecordWriter, error) {
switch strings.ToLower(compressionType) {
case "", "none":
return &RecordWriter{mode: compressionNone}, nil
case CompressionSnappy:
return &RecordWriter{mode: compressionSnappy}, nil
default:
return nil, fmt.Errorf("unsupported compression type: %s", compressionType)
}
}

// Write serializes data using the configured compression mode.
func (w *RecordWriter) Write(data []byte) ([]byte, error) {
if w.mode == compressionNone {
return data, nil
}

var buf bytes.Buffer
writer := snappy.NewBufferedWriter(&buf)

if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// Decompress decodes snappy-compressed data
// If the snappy header is present, it will successfully decompress it or return an error
// If the snappy header is absent, it treats data as uncompressed and returns it as-is
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,55 @@
package common

import (
"bytes"
"encoding/json"
"testing"

"github.com/golang/snappy"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRecordWriter(t *testing.T) {
original := []byte("test-data")

t.Run("no compression when empty", func(t *testing.T) {
writer, err := NewRecordWriter("")
require.NoError(t, err)

out, err := writer.Write(original)
require.NoError(t, err)
assert.Equal(t, original, out)
})

t.Run("no compression when none", func(t *testing.T) {
writer, err := NewRecordWriter("none")
require.NoError(t, err)

out, err := writer.Write(original)
require.NoError(t, err)
assert.Equal(t, original, out)
})

t.Run("snappy compression", func(t *testing.T) {
writer, err := NewRecordWriter(CompressionSnappy)
require.NoError(t, err)

out, err := writer.Write(original)
require.NoError(t, err)
require.NotNil(t, out)
assert.NotEqual(t, original, out)

decompressed, err := Decompress(out)
require.NoError(t, err)
assert.Equal(t, original, decompressed)
})

t.Run("unsupported compression", func(t *testing.T) {
writer, err := NewRecordWriter("unsupported")
require.Error(t, err)
assert.Nil(t, writer)
})
}

func TestDecompress(t *testing.T) {
t.Run("Empty data", func(t *testing.T) {
decompressed, err := Decompress([]byte{})
Expand Down Expand Up @@ -38,7 +78,10 @@ func TestDecompress(t *testing.T) {

t.Run("Compressed data", func(t *testing.T) {
original := []byte(`{"status":"DRAINING"}`)
compressed, err := compressData(original)
writer, err := NewRecordWriter(CompressionSnappy)
require.NoError(t, err)

compressed, err := writer.Write(original)
require.NoError(t, err)

result, err := Decompress(compressed)
Expand Down Expand Up @@ -74,7 +117,10 @@ func TestDecompressAndUnmarshal(t *testing.T) {
Shards: []string{"shard3", "shard4"},
}
originalJSON, _ := json.Marshal(original)
compressed, err := compressData(originalJSON)
writer, err := NewRecordWriter(CompressionSnappy)
require.NoError(t, err)

compressed, err := writer.Write(originalJSON)
require.NoError(t, err)

var result testData
Expand All @@ -93,16 +139,3 @@ func TestDecompressAndUnmarshal(t *testing.T) {
assert.Contains(t, err.Error(), "unmarshal")
})
}

func compressData(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := snappy.NewBufferedWriter(&buf)

if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
68 changes: 51 additions & 17 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ var (
)

type executorStoreImpl struct {
client *clientv3.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
client *clientv3.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
}

// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
Endpoints []string `yaml:"endpoints"`
DialTimeout time.Duration `yaml:"dialTimeout"`
Prefix string `yaml:"prefix"`
Compression string `yaml:"compression"`
}

if err := p.Cfg.Store.StorageParams.Decode(&etcdCfg); err != nil {
Expand All @@ -88,12 +90,18 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
timeSource = clock.NewRealTimeSource()
}

recordWriter, err := common.NewRecordWriter(etcdCfg.Compression)
if err != nil {
return nil, fmt.Errorf("create record writer: %w", err)
}

store := &executorStoreImpl{
client: etcdClient,
prefix: etcdCfg.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
client: etcdClient,
prefix: etcdCfg.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
}

p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
Expand All @@ -119,19 +127,30 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec

reportedShardsData, err := json.Marshal(request.ReportedShards)
if err != nil {
return fmt.Errorf("marshal assinged shards: %w", err)
return fmt.Errorf("marshal reported shards: %w", err)
}

jsonState, err := json.Marshal(request.Status)
if err != nil {
return fmt.Errorf("marshal assinged shards: %w", err)
return fmt.Errorf("marshal assinged state: %w", err)
}

// Compress data before writing to etcd
compressedReportedShards, err := s.recordWriter.Write(reportedShardsData)
if err != nil {
return fmt.Errorf("compress reported shards: %w", err)
}

compressedState, err := s.recordWriter.Write(jsonState)
if err != nil {
return fmt.Errorf("compress assigned state: %w", err)
}

// Build all operations including metadata
ops := []clientv3.Op{
clientv3.OpPut(heartbeatKey, etcdtypes.FormatTime(request.LastHeartbeat)),
clientv3.OpPut(stateKey, string(jsonState)),
clientv3.OpPut(reportedShardsKey, string(reportedShardsData)),
clientv3.OpPut(stateKey, string(compressedState)),
clientv3.OpPut(reportedShardsKey, string(compressedReportedShards)),
}
for key, value := range request.Metadata {
metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key)
Expand Down Expand Up @@ -357,7 +376,13 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
if err != nil {
return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err)
}
ops = append(ops, clientv3.OpPut(executorStateKey, string(value)))

compressedValue, err := s.recordWriter.Write(value)
if err != nil {
return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err)
}
ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue)))

comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision))
}

Expand Down Expand Up @@ -490,15 +515,24 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
state.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
}

// Compress new state value
newStateValue, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("marshal new assigned state: %w", err)
}
compressedStateValue, err := s.recordWriter.Write(newStateValue)
if err != nil {
return fmt.Errorf("compress new assigned state: %w", err)
}

newStatsValue, err := json.Marshal(shardStats)
if err != nil {
return fmt.Errorf("marshal new shard statistics: %w", err)
}
compressedStatsValue, err := s.recordWriter.Write(newStatsValue)
if err != nil {
return fmt.Errorf("compress new shard statistics: %w", err)
}

var comparisons []clientv3.Cmp

Expand Down Expand Up @@ -527,8 +561,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
txnResp, err := s.client.Txn(ctx).
If(comparisons...).
Then(
clientv3.OpPut(assignedState, string(newStateValue)),
clientv3.OpPut(shardStatsKey, string(newStatsValue)),
clientv3.OpPut(assignedState, string(compressedStateValue)),
clientv3.OpPut(shardStatsKey, string(compressedStatsValue)),
).
Commit()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx/fxtest"
"gopkg.in/yaml.v2"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/store"
Expand Down Expand Up @@ -88,6 +90,63 @@ func TestRecordHeartbeat(t *testing.T) {
assert.Equal(t, "value-2", string(resp.Kvs[0].Value))
}

func TestRecordHeartbeat_NoCompression(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)

var etcdCfg struct {
Endpoints []string `yaml:"endpoints"`
DialTimeout time.Duration `yaml:"dialTimeout"`
Prefix string `yaml:"prefix"`
Compression string `yaml:"compression"`
}
require.NoError(t, tc.LeaderCfg.Store.StorageParams.Decode(&etcdCfg))
etcdCfg.Compression = "none"

encodedCfg, err := yaml.Marshal(etcdCfg)
require.NoError(t, err)

var yamlNode *config.YamlNode
require.NoError(t, yaml.Unmarshal(encodedCfg, &yamlNode))
tc.LeaderCfg.Store.StorageParams = yamlNode
tc.LeaderCfg.LeaderStore.StorageParams = yamlNode
tc.Compression = "none"

executorStore := createStore(t, tc)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

executorID := "executor-no-compression"
req := store.HeartbeatState{
LastHeartbeat: time.Now().UTC(),
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{
"shard-no-compression": {Status: types.ShardStatusREADY},
},
}

require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req))

stateKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorStatusKey)
require.NoError(t, err)
reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorReportedShardsKey)
require.NoError(t, err)

stateResp, err := tc.Client.Get(ctx, stateKey)
require.NoError(t, err)
require.Equal(t, int64(1), stateResp.Count)
statusJSON, err := json.Marshal(req.Status)
require.NoError(t, err)
assert.Equal(t, string(statusJSON), string(stateResp.Kvs[0].Value))

reportedResp, err := tc.Client.Get(ctx, reportedShardsKey)
require.NoError(t, err)
require.Equal(t, int64(1), reportedResp.Count)
reportedJSON, err := json.Marshal(req.ReportedShards)
require.NoError(t, err)
assert.Equal(t, string(reportedJSON), string(reportedResp.Kvs[0].Value))
}

func TestGetHeartbeat(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)
Expand Down Expand Up @@ -369,7 +428,12 @@ func TestSubscribe(t *testing.T) {

// Now update the reported shards, which IS a significant change
reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards")
_, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`)
require.NoError(t, err)
writer, err := common.NewRecordWriter(tc.Compression)
require.NoError(t, err)
compressedShards, err := writer.Write([]byte(`{"shard-1":{"status":"running"}}`))
require.NoError(t, err)
_, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards))
require.NoError(t, err)

select {
Expand Down
Loading