Skip to content

Commit fa36bd2

Browse files
committed
limit the number of peer initiated group states we track
1 parent 1895be3 commit fa36bd2

File tree

2 files changed

+38
-11
lines changed

2 files changed

+38
-11
lines changed

partialmessages/partialmsgs.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import (
1313
)
1414

1515
// TODO: Add gossip fallback (pick random connected peers and send ihave/iwants)
16-
// Question: How to configure scheduling RPCs?
17-
// - Leave it up to the implementation of sendRPC
18-
// Question: Skip partial IHAVE for now?
19-
// Question: I could have a user provided validation queue instead of requiring republishing
20-
// - But a user may need to republish anyways if they get parts out of band
2116

2217
const minGroupTTL = 3
2318

19+
// peerInitiatedGroupLimitPerTopic limits the total number (per topic) of
20+
// *partialMessageStatePerTopicGroup we create in response to a incoming RPC.
21+
// This only applies to groups that we haven't published for yet.
22+
const peerInitiatedGroupLimitPerTopic = 256
23+
2424
// PartsMetadata returns metadata about the parts this partial message
2525
// contains and, possibly implicitly, the parts it wants.
2626
type PartsMetadata []byte
@@ -66,8 +66,9 @@ func (ps *peerState) IsZero() bool {
6666
}
6767

6868
type partialMessageStatePerTopicGroup struct {
69-
peerState map[peer.ID]*peerState
70-
groupTTL int
69+
peerState map[peer.ID]*peerState
70+
groupTTL int
71+
peerInitiated bool
7172
}
7273

7374
func newPartialMessageStatePerTopicGroup(groupTTL int) *partialMessageStatePerTopicGroup {
@@ -116,6 +117,8 @@ type PartialMessageExtension struct {
116117

117118
statePerTopicPerGroup map[string]map[string]*partialMessageStatePerTopicGroup
118119

120+
peerInitiatedGroupCountPerTopic map[string]int
121+
119122
router Router
120123
}
121124

@@ -132,16 +135,28 @@ type Router interface {
132135
MeshPeers(topic string) iter.Seq[peer.ID]
133136
}
134137

135-
func (e *PartialMessageExtension) groupState(topic string, groupID []byte) (*partialMessageStatePerTopicGroup, error) {
138+
func (e *PartialMessageExtension) groupState(topic string, groupID []byte, peerInitiated bool) (*partialMessageStatePerTopicGroup, error) {
136139
statePerTopic, ok := e.statePerTopicPerGroup[topic]
137140
if !ok {
138141
statePerTopic = make(map[string]*partialMessageStatePerTopicGroup)
139142
e.statePerTopicPerGroup[topic] = statePerTopic
140143
}
141144
state, ok := statePerTopic[string(groupID)]
142145
if !ok {
146+
if peerInitiated {
147+
if e.peerInitiatedGroupCountPerTopic[topic] >= peerInitiatedGroupLimitPerTopic {
148+
return nil, errors.New("too many peer initiated group states")
149+
}
150+
e.peerInitiatedGroupCountPerTopic[topic]++
151+
}
152+
143153
state = newPartialMessageStatePerTopicGroup(e.GroupTTLByHeatbeat)
144154
statePerTopic[string(groupID)] = state
155+
state.peerInitiated = peerInitiated
156+
}
157+
if !peerInitiated && state.peerInitiated {
158+
state.peerInitiated = false
159+
e.peerInitiatedGroupCountPerTopic[topic]--
145160
}
146161
return state, nil
147162
}
@@ -161,6 +176,7 @@ func (e *PartialMessageExtension) Init(router Router) error {
161176
return errors.New("field MergePartsMetadata must be set")
162177
}
163178
e.statePerTopicPerGroup = make(map[string]map[string]*partialMessageStatePerTopicGroup)
179+
e.peerInitiatedGroupCountPerTopic = make(map[string]int)
164180

165181
return nil
166182
}
@@ -169,7 +185,7 @@ func (e *PartialMessageExtension) PublishPartial(topic string, partial Message,
169185
groupID := partial.GroupID()
170186
myPartsMeta := partial.PartsMetadata()
171187

172-
state, err := e.groupState(topic, groupID)
188+
state, err := e.groupState(topic, groupID, false)
173189
if err != nil {
174190
return err
175191
}
@@ -261,6 +277,9 @@ func (e *PartialMessageExtension) Heartbeat() {
261277
if len(statePerTopic) == 0 {
262278
delete(e.statePerTopicPerGroup, topic)
263279
}
280+
if s.peerInitiated {
281+
e.peerInitiatedGroupCountPerTopic[topic]--
282+
}
264283
} else {
265284
s.groupTTL--
266285
}
@@ -284,8 +303,7 @@ func (e *PartialMessageExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessage
284303
topic := rpc.GetTopicID()
285304
groupID := rpc.GroupID
286305

287-
// TODO: limit the number of peer-initiated groupIDs per topic
288-
state, err := e.groupState(topic, groupID)
306+
state, err := e.groupState(topic, groupID, true)
289307
if err != nil {
290308
return err
291309
}

partialmessages/partialmsgs_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ func (m *mockNetworkPartialMessages) removePeers() {
6868
m.handlers[b].RemovePeer(a)
6969
}
7070
}
71+
72+
// assert that there are no leaked peerInitiatedGroupCountPerTopics
73+
for _, h := range m.handlers {
74+
for topic, count := range h.peerInitiatedGroupCountPerTopic {
75+
if count != 0 {
76+
m.t.Errorf("unexpected peerInitiatedGroupCountPerTopic for topic %s: %d", topic, count)
77+
}
78+
}
79+
}
7180
}
7281

7382
func (m *mockNetworkPartialMessages) handleRPCs() bool {

0 commit comments

Comments
 (0)