@@ -16,10 +16,12 @@ import (
1616
1717const minGroupTTL = 3
1818
19- // peerInitiatedGroupLimitPerTopic limits the total number (per topic) of
19+ // defaultPeerInitiatedGroupLimitPerTopic limits the total number (per topic) of
2020// *partialMessageStatePerTopicGroup we create in response to a incoming RPC.
2121// This only applies to groups that we haven't published for yet.
22- const peerInitiatedGroupLimitPerTopic = 256
22+ const defaultPeerInitiatedGroupLimitPerTopic = 255
23+
24+ const defaultPeerInitiatedGroupLimitPerTopicPerPeer = 4
2325
2426// PartsMetadata returns metadata about the parts this partial message
2527// contains and, possibly implicitly, the parts it wants.
@@ -66,9 +68,9 @@ func (ps *peerState) IsZero() bool {
6668}
6769
6870type partialMessageStatePerTopicGroup struct {
69- peerState map [peer.ID ]* peerState
70- groupTTL int
71- peerInitiated bool
71+ peerState map [peer.ID ]* peerState
72+ groupTTL int
73+ initiatedBy peer. ID // zero value if we initiated the group
7274}
7375
7476func newPartialMessageStatePerTopicGroup (groupTTL int ) * partialMessageStatePerTopicGroup {
@@ -78,6 +80,10 @@ func newPartialMessageStatePerTopicGroup(groupTTL int) *partialMessageStatePerTo
7880 }
7981}
8082
83+ func (s * partialMessageStatePerTopicGroup ) remotePeerInitiated () bool {
84+ return s .initiatedBy != ""
85+ }
86+
8187func (s * partialMessageStatePerTopicGroup ) clearPeerMetadata (peerID peer.ID ) {
8288 if peerState , ok := s .peerState [peerID ]; ok {
8389 peerState .partsMetadata = nil
@@ -111,13 +117,26 @@ type PartialMessageExtension struct {
111117 // expected bounds?
112118 ValidateRPC func (from peer.ID , rpc * pb.PartialMessagesExtension ) error
113119
120+ // PeerInitiatedGroupLimitPerTopic limits the number of Group states all
121+ // peers can initialize per topic. A group state is initialized by a peer if
122+ // the peer's message marks the first time we've seen a group id.
123+ PeerInitiatedGroupLimitPerTopic int
124+
125+ // PeerInitiatedGroupLimitPerTopicPerPeer limits the number of Group states
126+ // a single peer can initialize per topic. A group state is initialized by a
127+ // peer if the peer's message marks the first time we've seen a group id.
128+ PeerInitiatedGroupLimitPerTopicPerPeer int
129+
114130 // GroupTTLByHeatbeat is how many heartbeats we store Group state for after
115131 // publishing a partial message for the group.
116132 GroupTTLByHeatbeat int
117133
134+ // map topic -> map[group]partialMessageStatePerTopicGroup
135+ // TODO rename this to ...PerGroupPerTopic
118136 statePerTopicPerGroup map [string ]map [string ]* partialMessageStatePerTopicGroup
119137
120- peerInitiatedGroupCountPerTopic map [string ]int
138+ // map[topic]counter
139+ peerInitiatedGroupCounter map [string ]* peerInitiatedGroupCounterState
121140
122141 router Router
123142}
@@ -136,28 +155,32 @@ type Router interface {
136155 PeerRequestsPartial (peer peer.ID , topic string ) bool
137156}
138157
139- func (e * PartialMessageExtension ) groupState (topic string , groupID []byte , peerInitiated bool ) (* partialMessageStatePerTopicGroup , error ) {
158+ func (e * PartialMessageExtension ) groupState (topic string , groupID []byte , peerInitiated bool , from peer. ID ) (* partialMessageStatePerTopicGroup , error ) {
140159 statePerTopic , ok := e .statePerTopicPerGroup [topic ]
141160 if ! ok {
142161 statePerTopic = make (map [string ]* partialMessageStatePerTopicGroup )
143162 e .statePerTopicPerGroup [topic ] = statePerTopic
144163 }
164+ if _ , ok := e .peerInitiatedGroupCounter [topic ]; ! ok {
165+ e .peerInitiatedGroupCounter [topic ] = & peerInitiatedGroupCounterState {}
166+ }
145167 state , ok := statePerTopic [string (groupID )]
146168 if ! ok {
147169 if peerInitiated {
148- if e .peerInitiatedGroupCountPerTopic [topic ] >= peerInitiatedGroupLimitPerTopic {
149- return nil , errors .New ("too many peer initiated group states" )
170+ err := e .peerInitiatedGroupCounter [topic ].Inc (e .PeerInitiatedGroupLimitPerTopic , e .PeerInitiatedGroupLimitPerTopicPerPeer , from )
171+ if err != nil {
172+ return nil , err
150173 }
151- e .peerInitiatedGroupCountPerTopic [topic ]++
152174 }
153175
154176 state = newPartialMessageStatePerTopicGroup (e .GroupTTLByHeatbeat )
155177 statePerTopic [string (groupID )] = state
156- state .peerInitiated = peerInitiated
178+ state .initiatedBy = from
157179 }
158- if ! peerInitiated && state .peerInitiated {
159- state .peerInitiated = false
160- e .peerInitiatedGroupCountPerTopic [topic ]--
180+ if ! peerInitiated && state .remotePeerInitiated () {
181+ // We've tried to initiate this state as well, so it's no longer peer initiated.
182+ e .peerInitiatedGroupCounter [topic ].Dec (state .initiatedBy )
183+ state .initiatedBy = ""
161184 }
162185 return state , nil
163186}
@@ -176,8 +199,16 @@ func (e *PartialMessageExtension) Init(router Router) error {
176199 if e .MergePartsMetadata == nil {
177200 return errors .New ("field MergePartsMetadata must be set" )
178201 }
202+
203+ if e .PeerInitiatedGroupLimitPerTopic == 0 {
204+ e .PeerInitiatedGroupLimitPerTopic = defaultPeerInitiatedGroupLimitPerTopic
205+ }
206+ if e .PeerInitiatedGroupLimitPerTopicPerPeer == 0 {
207+ e .PeerInitiatedGroupLimitPerTopicPerPeer = defaultPeerInitiatedGroupLimitPerTopicPerPeer
208+ }
209+
179210 e .statePerTopicPerGroup = make (map [string ]map [string ]* partialMessageStatePerTopicGroup )
180- e .peerInitiatedGroupCountPerTopic = make (map [string ]int )
211+ e .peerInitiatedGroupCounter = make (map [string ]* peerInitiatedGroupCounterState )
181212
182213 return nil
183214}
@@ -186,7 +217,7 @@ func (e *PartialMessageExtension) PublishPartial(topic string, partial Message,
186217 groupID := partial .GroupID ()
187218 myPartsMeta := partial .PartsMetadata ()
188219
189- state , err := e .groupState (topic , groupID , false )
220+ state , err := e .groupState (topic , groupID , false , "" )
190221 if err != nil {
191222 return err
192223 }
@@ -264,10 +295,13 @@ func (e *PartialMessageExtension) AddPeer(id peer.ID) {
264295}
265296
266297func (e * PartialMessageExtension ) RemovePeer (id peer.ID ) {
267- for _ , statePerTopic := range e .statePerTopicPerGroup {
298+ for topic , statePerTopic := range e .statePerTopicPerGroup {
268299 for _ , state := range statePerTopic {
269300 delete (state .peerState , id )
270301 }
302+ if ctr , ok := e .peerInitiatedGroupCounter [topic ]; ok {
303+ ctr .RemovePeer (id )
304+ }
271305 }
272306}
273307
@@ -279,8 +313,8 @@ func (e *PartialMessageExtension) Heartbeat() {
279313 if len (statePerTopic ) == 0 {
280314 delete (e .statePerTopicPerGroup , topic )
281315 }
282- if s .peerInitiated {
283- e .peerInitiatedGroupCountPerTopic [topic ]--
316+ if s .remotePeerInitiated () {
317+ e .peerInitiatedGroupCounter [topic ]. Dec ( s . initiatedBy )
284318 }
285319 } else {
286320 s .groupTTL --
@@ -305,7 +339,7 @@ func (e *PartialMessageExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessage
305339 topic := rpc .GetTopicID ()
306340 groupID := rpc .GroupID
307341
308- state , err := e .groupState (topic , groupID , true )
342+ state , err := e .groupState (topic , groupID , true , from )
309343 if err != nil {
310344 return err
311345 }
@@ -321,3 +355,45 @@ func (e *PartialMessageExtension) HandleRPC(from peer.ID, rpc *pb.PartialMessage
321355
322356 return e .OnIncomingRPC (from , rpc )
323357}
358+
359+ type peerInitiatedGroupCounterState struct {
360+ // total number of peer initiated groups
361+ total int
362+ // number of groups initiated per peer
363+ perPeer map [peer.ID ]int
364+ }
365+
366+ var errPeerInitiatedGroupTotalLimitReached = errors .New ("too many peer initiated group states" )
367+ var errPeerInitiatedGroupLimitReached = errors .New ("too many peer initiated group states for this peer" )
368+
369+ func (ctr * peerInitiatedGroupCounterState ) Inc (totalLimit int , peerLimit int , id peer.ID ) error {
370+ if ctr .total >= totalLimit {
371+ return errPeerInitiatedGroupTotalLimitReached
372+ }
373+ if ctr .perPeer == nil {
374+ ctr .perPeer = make (map [peer.ID ]int )
375+ }
376+ if ctr .perPeer [id ] >= peerLimit {
377+ return errPeerInitiatedGroupLimitReached
378+ }
379+ ctr .total ++
380+ ctr .perPeer [id ]++
381+ return nil
382+ }
383+
384+ func (ctr * peerInitiatedGroupCounterState ) Dec (id peer.ID ) {
385+ if _ , ok := ctr .perPeer [id ]; ok {
386+ ctr .total --
387+ ctr .perPeer [id ]--
388+ if ctr .perPeer [id ] == 0 {
389+ delete (ctr .perPeer , id )
390+ }
391+ }
392+ }
393+
394+ func (ctr * peerInitiatedGroupCounterState ) RemovePeer (id peer.ID ) {
395+ if n , ok := ctr .perPeer [id ]; ok {
396+ ctr .total -= n
397+ delete (ctr .perPeer , id )
398+ }
399+ }
0 commit comments