Skip to content

Commit 4718e5f

Browse files
authored
Tariffs: reduce published data volume by x10 (#24375)
1 parent 62e58b0 commit 4718e5f

File tree

9 files changed

+141
-51
lines changed

9 files changed

+141
-51
lines changed

assets/js/store.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const initialState: State = {
3131
offline: false,
3232
loadpoints: [],
3333
vehicles: {},
34+
forecast: {},
3435
};
3536

3637
const state = reactive(initialState);

assets/js/types/evcc.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export interface State {
5252
offline: boolean;
5353
startup?: boolean;
5454
loadpoints: Loadpoint[];
55-
forecast?: Forecast;
55+
forecast: Forecast;
5656
currency?: CURRENCY;
5757
fatal?: FatalError[];
5858
authProviders?: AuthProviders;

core/site_tariffs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/evcc-io/evcc/core/keys"
1111
"github.com/evcc-io/evcc/server/db/settings"
1212
"github.com/evcc-io/evcc/tariff"
13+
"github.com/evcc-io/evcc/util"
1314
"github.com/jinzhu/now"
1415
"github.com/samber/lo"
1516
)
@@ -117,7 +118,7 @@ func (site *Site) publishTariffs(greenShareHome float64, greenShareLoadpoints fl
117118
fc.Solar = lo.ToPtr(site.solarDetails(solar))
118119
}
119120

120-
site.publish(keys.Forecast, fc)
121+
site.publish(keys.Forecast, util.NewSharder(keys.Forecast, fc))
121122
}
122123

123124
func (site *Site) solarDetails(solar api.Rates) solarDetails {

server/socket.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"context"
5+
"encoding/json"
56
"net/http"
67
"strings"
78
"sync"
@@ -110,33 +111,59 @@ func (h *SocketHub) deleteSubscriber(s *socketSubscriber) {
110111
}
111112

112113
func (h *SocketHub) welcome(subscriber *socketSubscriber, params []util.Param) {
113-
var msg strings.Builder
114-
msg.WriteString("{")
114+
msg := make(map[string]json.RawMessage, len(params))
115+
115116
for _, p := range params {
116-
if msg.Len() > 1 {
117-
msg.WriteString(",")
117+
k := p.Key
118+
if p.Loadpoint != nil {
119+
k = "loadpoints." + p.UniqueID()
118120
}
119-
msg.WriteString(kv(p))
121+
122+
msg[k] = json.RawMessage(socketEncode(p.Val))
120123
}
121-
msg.WriteString("}")
124+
125+
b, _ := json.Marshal(msg)
122126

123127
// should not block
124-
subscriber.send <- []byte(msg.String())
128+
subscriber.send <- b
125129
}
126130

127131
func (h *SocketHub) broadcast(p util.Param) {
128132
h.mu.RLock()
129133
defer h.mu.RUnlock()
130134

131-
if len(h.subscribers) > 0 {
132-
msg := "{" + kv(p) + "}"
135+
if len(h.subscribers) == 0 {
136+
return
137+
}
138+
139+
msg := make(map[string]json.RawMessage)
133140

134-
for s := range h.subscribers {
135-
select {
136-
case s.send <- []byte(msg):
137-
default:
138-
s.closeSlow()
139-
}
141+
k := p.Key
142+
if p.Loadpoint != nil {
143+
k = "loadpoints." + p.UniqueID()
144+
}
145+
146+
// Sharder splits data into chunks
147+
if sp, ok := (p.Val).(util.Sharder); ok {
148+
shards := sp.Shards()
149+
if len(shards) == 0 {
150+
return // nothing changed, skip broadcast
151+
}
152+
153+
for _, shard := range shards {
154+
msg[k+"."+shard.Key] = json.RawMessage(socketEncode(shard.Value))
155+
}
156+
} else {
157+
msg[k] = json.RawMessage(socketEncode(p.Val))
158+
}
159+
160+
b, _ := json.Marshal(msg)
161+
162+
for s := range h.subscribers {
163+
select {
164+
case s.send <- b:
165+
default:
166+
s.closeSlow()
140167
}
141168
}
142169
}

server/socket_helper.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"reflect"
77
"strings"
88

9-
"github.com/evcc-io/evcc/util"
109
"github.com/evcc-io/evcc/util/encode"
1110
)
1211

@@ -31,36 +30,22 @@ func encodeSliceAsString(v any) (string, error) {
3130
return fmt.Sprintf("[%s]", strings.Join(res, ",")), nil
3231
}
3332

34-
func kv(p util.Param) string {
33+
func socketEncode(pval any) string {
3534
var (
3635
val string
3736
err error
3837
)
3938

4039
// unwrap slices
41-
if p.Val != nil && reflect.TypeOf(p.Val).Kind() == reflect.Slice {
42-
val, err = encodeSliceAsString(p.Val)
40+
if rv := reflect.ValueOf(pval); pval != nil && rv.Kind() == reflect.Slice && !rv.IsNil() {
41+
val, err = encodeSliceAsString(pval)
4342
} else {
44-
val, err = encodeAsString(p.Val)
43+
val, err = encodeAsString(pval)
4544
}
4645

4746
if err != nil {
4847
panic(err)
4948
}
5049

51-
if p.Key == "" && val == "" {
52-
log.ERROR.Printf("invalid key/val for %+v, please report to https://github.com/evcc-io/evcc/issues/6439", p)
53-
return "\"foo\":\"bar\""
54-
}
55-
56-
var msg strings.Builder
57-
msg.WriteString("\"")
58-
if p.Loadpoint != nil {
59-
msg.WriteString(fmt.Sprintf("loadpoints.%d.", *p.Loadpoint))
60-
}
61-
msg.WriteString(p.Key)
62-
msg.WriteString("\":")
63-
msg.WriteString(val)
64-
65-
return msg.String()
50+
return val
6651
}

tests/plan.evcc.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
interval: 0.25s
1+
interval: 0.1s
22

33
site:
44
title: Plan

util/param.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"maps"
55
"slices"
66
"strconv"
7-
"strings"
87
"sync"
98

109
"github.com/evcc-io/evcc/util/encode"
@@ -19,15 +18,11 @@ type Param struct {
1918

2019
// UniqueID returns unique identifier for parameter Loadpoint/Key combination
2120
func (p Param) UniqueID() string {
22-
var b strings.Builder
23-
2421
if p.Loadpoint != nil {
25-
b.WriteString(strconv.Itoa(*p.Loadpoint) + ".")
22+
return strconv.Itoa(*p.Loadpoint) + "." + p.Key
2623
}
2724

28-
b.WriteString(p.Key)
29-
30-
return b.String()
25+
return p.Key
3126
}
3227

3328
// ParamCache is a data store

util/param_shard.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package util
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
"sync"
9+
10+
"github.com/fatih/structs"
11+
)
12+
13+
// Sharder splits data into chunks, omitting unmodified chunks
14+
type Sharder interface {
15+
Shards() []Shard
16+
}
17+
18+
type Shard struct {
19+
Key string
20+
Value any
21+
}
22+
23+
type sharderImpl struct {
24+
prefix string
25+
struc any
26+
}
27+
28+
// shared shard cache
29+
var (
30+
shardCache = make(map[string][32]byte)
31+
shardMu sync.Mutex
32+
)
33+
34+
func (s *sharderImpl) MarshalJSON() ([]byte, error) {
35+
return json.Marshal(s.struc)
36+
}
37+
38+
func (s *sharderImpl) Shards() []Shard {
39+
ff := structs.Fields(s.struc)
40+
res := make([]Shard, 0, len(ff))
41+
42+
shardMu.Lock()
43+
defer shardMu.Unlock()
44+
45+
for _, f := range ff {
46+
key := f.Name()
47+
if t := f.Tag("json"); t != "" {
48+
if n := strings.Split(t, ",")[0]; n != "" {
49+
key = n
50+
}
51+
}
52+
53+
// Use JSON for stable hashing (fmt.Append includes pointer addresses)
54+
b, err := json.Marshal(f.Value())
55+
if err != nil {
56+
// Fallback to fmt.Append if JSON fails
57+
b = fmt.Append(nil, f.Value())
58+
}
59+
60+
hash := sha256.Sum256(b)
61+
if cached, ok := shardCache[s.prefix+key]; ok && hash == cached {
62+
continue
63+
}
64+
shardCache[s.prefix+key] = hash
65+
66+
res = append(res, Shard{
67+
Key: key,
68+
Value: f.Value(),
69+
})
70+
}
71+
72+
return res
73+
}
74+
75+
var _ Sharder = (*sharderImpl)(nil)
76+
77+
// NewSharder creates a Sharder that splits structs into sub-structs for space-efficient socket publishing
78+
// Passing anything else than a struct will panic
79+
func NewSharder(prefix string, struc any) Sharder {
80+
return &sharderImpl{prefix, struc}
81+
}

util/tee.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package util
22

33
import (
4-
"reflect"
54
"sync"
65

76
"github.com/evcc-io/evcc/api"
@@ -37,11 +36,12 @@ func (t *Tee) add(out chan<- Param) {
3736
// Run starts parameter distribution
3837
func (t *Tee) Run(in <-chan Param) {
3938
for msg := range in {
40-
if val := reflect.ValueOf(msg.Val); val.Kind() == reflect.Ptr {
41-
if ptr := reflect.Indirect(val); ptr.IsValid() {
42-
msg.Val = ptr.Addr().Elem().Interface()
43-
}
44-
}
39+
// TODO MUST NOT PUBLISH POINTERS (WHO'S VALUES ARE LATER MODIFIED)
40+
// if val := reflect.ValueOf(msg.Val); val.Kind() == reflect.Ptr {
41+
// if ptr := reflect.Indirect(val); ptr.IsValid() {
42+
// fmt.Println("DANGER pointer value:", msg.Key)
43+
// }
44+
// }
4545

4646
if val, ok := (msg.Val).(api.Redactor); ok {
4747
msg.Val = val.Redacted()

0 commit comments

Comments
 (0)