Skip to content

Commit e2702a7

Browse files
committed
Limit the number of outstanding IWANTs per messageid and per heartbeat.
This fixes an issue where a node would end up making an IWANT request each time a peer advertised a message with IHAVE, resulting in many duplicates for the requesting peer. The change is to limit the number of IWANT requests per message id, and to reset this limit every heartbeat. This allows fewer reqeusts
1 parent ae65ce4 commit e2702a7

File tree

2 files changed

+134
-1
lines changed

2 files changed

+134
-1
lines changed

gossipsub.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ var (
7474
GossipSubIWantFollowupTime = 3 * time.Second
7575
GossipSubIDontWantMessageThreshold = 1024 // 1KB
7676
GossipSubIDontWantMessageTTL = 3 // 3 heartbeats
77+
78+
GossipSubMaxIWantsPerMessageIDPerHeartbeat = 10
7779
)
7880

7981
type checksum struct {
@@ -239,6 +241,11 @@ type GossipSubParams struct {
239241

240242
// IDONTWANT is cleared when it's older than the TTL.
241243
IDontWantMessageTTL int
244+
245+
// MaxIWantsPerMessageIDPerHeartbeat is the maximum number of pending IWANT
246+
// requests allowed per message ID per heartbeat. This helps limit the
247+
// number of duplicates we'll receive from peers.
248+
MaxIWantsPerMessageIDPerHeartbeat int
242249
}
243250

244251
// NewGossipSub returns a new PubSub object using the default GossipSubRouter as the router.
@@ -276,6 +283,10 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
276283
feature: GossipSubDefaultFeatures,
277284
tagTracer: newTagTracer(h.ConnManager()),
278285
params: params,
286+
// number of allowed IWANTs per message ID. If the message ID is
287+
// missing, it must be initialized to
288+
// `MaxIWantsPerMessageIDPerHeartbeat`
289+
allowedIWantCount: make(map[string]int),
279290
}
280291
}
281292

@@ -315,6 +326,8 @@ func DefaultGossipSubParams() GossipSubParams {
315326
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
316327
IDontWantMessageTTL: GossipSubIDontWantMessageTTL,
317328
SlowHeartbeatWarning: 0.1,
329+
330+
MaxIWantsPerMessageIDPerHeartbeat: GossipSubMaxIWantsPerMessageIDPerHeartbeat,
318331
}
319332
}
320333

@@ -479,6 +492,10 @@ type GossipSubRouter struct {
479492
connect chan connectInfo // px connection requests
480493
cab peerstore.AddrBook
481494

495+
// allowed number of IWANTs per message ID. Must be initialized to
496+
// `MaxIWantsPerMessageIDPerHeartbeat` if message id is missing.
497+
allowedIWantCount map[string]int
498+
482499
protos []protocol.ID
483500
feature GossipSubFeatureTest
484501

@@ -713,11 +730,13 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
713730
func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) {
714731
tmids := make(map[string][]string)
715732
for _, msg := range msgs {
733+
mid := gs.p.idGen.ID(msg)
734+
delete(gs.allowedIWantCount, mid)
716735
if len(msg.GetData()) < gs.params.IDontWantMessageThreshold {
717736
continue
718737
}
719738
topic := msg.GetTopic()
720-
tmids[topic] = append(tmids[topic], gs.p.idGen.ID(msg))
739+
tmids[topic] = append(tmids[topic], mid)
721740
}
722741
for topic, mids := range tmids {
723742
if len(mids) == 0 {
@@ -803,7 +822,20 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
803822
if gs.p.seenMessage(mid) {
804823
continue
805824
}
825+
826+
allowedIWants, ok := gs.allowedIWantCount[mid]
827+
if !ok {
828+
allowedIWants = gs.params.MaxIWantsPerMessageIDPerHeartbeat
829+
}
830+
831+
// Check if we've exceeded the maximum number of pending IWANTs for this message
832+
if allowedIWants <= 0 {
833+
log.Debugf("IHAVE: ignoring IHAVE for message %s from peer %s; too many inflight IWANTs", mid, p)
834+
continue
835+
}
836+
806837
iwant[mid] = struct{}{}
838+
gs.allowedIWantCount[mid] = allowedIWants - 1
807839
}
808840
}
809841

@@ -1453,6 +1485,9 @@ func (gs *GossipSubRouter) heartbeat() {
14531485
toprune := make(map[peer.ID][]string)
14541486
noPX := make(map[peer.ID]bool)
14551487

1488+
// reset number of allowed IWANT requests
1489+
gs.resetAllowedIWants()
1490+
14561491
// clean up expired backoffs
14571492
gs.clearBackoff()
14581493

@@ -1702,6 +1737,13 @@ func (gs *GossipSubRouter) heartbeat() {
17021737
gs.mcache.Shift()
17031738
}
17041739

1740+
func (gs *GossipSubRouter) resetAllowedIWants() {
1741+
if len(gs.allowedIWantCount) > 0 {
1742+
// throw away the old map and make a new one
1743+
gs.allowedIWantCount = make(map[string]int)
1744+
}
1745+
}
1746+
17051747
func (gs *GossipSubRouter) clearIHaveCounters() {
17061748
if len(gs.peerhave) > 0 {
17071749
// throw away the old map and make a new one

gossipsub_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3860,3 +3860,94 @@ func BenchmarkSplitRPCLargeMessages(b *testing.B) {
38603860
}
38613861
})
38623862
}
3863+
3864+
func TestGossipsubLimitIWANT(t *testing.T) {
3865+
ctx, cancel := context.WithCancel(context.Background())
3866+
defer cancel()
3867+
hosts := getDefaultHosts(t, 3)
3868+
denseConnect(t, hosts)
3869+
3870+
psubs := make([]*PubSub, 3)
3871+
3872+
defaultParams := DefaultGossipSubParams()
3873+
defaultParams.MaxIWantsPerMessageIDPerHeartbeat = 1
3874+
// Delay the heartbeat so we don't reset our count of allowed IWANTS for the
3875+
// first part of the test.
3876+
defaultParams.HeartbeatInitialDelay = 4 * time.Second
3877+
defaultParams.HeartbeatInterval = time.Second
3878+
3879+
psubs[0] = getGossipsub(ctx, hosts[0], WithGossipSubParams(defaultParams))
3880+
3881+
var iwantsRecvd atomic.Int32
3882+
3883+
copy(psubs[1:], getGossipsubs(ctx, hosts[1:], WithRawTracer(&mockRawTracer{
3884+
onRecvRPC: func(rpc *RPC) {
3885+
if len(rpc.GetControl().GetIwant()) > 0 {
3886+
iwantsRecvd.Add(1)
3887+
}
3888+
},
3889+
})))
3890+
3891+
topicString := "foobar"
3892+
for _, ps := range psubs {
3893+
_, err := ps.Join(topicString)
3894+
if err != nil {
3895+
t.Fatal(err)
3896+
}
3897+
3898+
_, err = ps.Subscribe(topicString)
3899+
if err != nil {
3900+
t.Fatal(err)
3901+
}
3902+
}
3903+
time.Sleep(2 * time.Second)
3904+
3905+
publishIWant := func() {
3906+
psubs[1].eval <- func() {
3907+
psubs[1].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{
3908+
RPC: pb.RPC{
3909+
Control: &pb.ControlMessage{
3910+
Ihave: []*pb.ControlIHave{
3911+
{
3912+
TopicID: &topicString,
3913+
MessageIDs: []string{"1"},
3914+
},
3915+
},
3916+
},
3917+
},
3918+
}, false)
3919+
}
3920+
3921+
psubs[2].eval <- func() {
3922+
psubs[2].rt.(*GossipSubRouter).sendRPC(hosts[0].ID(), &RPC{
3923+
RPC: pb.RPC{
3924+
Control: &pb.ControlMessage{
3925+
Ihave: []*pb.ControlIHave{
3926+
{
3927+
TopicID: &topicString,
3928+
MessageIDs: []string{"1"},
3929+
},
3930+
},
3931+
},
3932+
},
3933+
}, false)
3934+
}
3935+
}
3936+
3937+
publishIWant()
3938+
time.Sleep(time.Second)
3939+
3940+
if iwantsRecvd.Swap(0) != 1 {
3941+
t.Fatal("Expected exactly 1 IWANT due to limits")
3942+
}
3943+
3944+
// Wait for heartbeat to reset limit
3945+
time.Sleep(2 * time.Second)
3946+
3947+
publishIWant()
3948+
time.Sleep(time.Second)
3949+
3950+
if iwantsRecvd.Swap(0) != 1 {
3951+
t.Fatal("Expected exactly 1 IWANT due to limits")
3952+
}
3953+
}

0 commit comments

Comments
 (0)