Skip to content

Commit c5773aa

Browse files
committed
add Topic option to skip publishing to partial message-capable peers
1 parent 58165b0 commit c5773aa

File tree

4 files changed

+137
-13
lines changed

4 files changed

+137
-13
lines changed

gossipsub.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,13 @@ func (gs *GossipSubRouter) PublishBatch(messages []*Message, opts *BatchPublishO
12951295
}
12961296
}
12971297

1298+
func (gs *GossipSubRouter) skipPartialMessageCapablePeers(topicID string) bool {
1299+
if t, ok := gs.p.myTopics[topicID]; ok {
1300+
return t.skipPublishingToPartialMessageCapablePeers
1301+
}
1302+
return false
1303+
}
1304+
12981305
func (gs *GossipSubRouter) Publish(msg *Message) {
12991306
for p, rpc := range gs.rpcs(msg) {
13001307
gs.sendRPC(p, rpc, false)
@@ -1371,8 +1378,9 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13711378
}
13721379

13731380
out := rpcWithMessages(msg.Message)
1381+
skipPMPeers := gs.skipPartialMessageCapablePeers(msg.GetTopic())
13741382
for pid := range tosend {
1375-
if pid == from || pid == peer.ID(msg.GetFrom()) {
1383+
if pid == from || pid == peer.ID(msg.GetFrom()) || (skipPMPeers && gs.extensions.peerExtensions[pid].PartialMessages) {
13761384
continue
13771385
}
13781386

gossipsub_test.go

Lines changed: 107 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,6 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu
6363
return psubs
6464
}
6565

66-
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
67-
var psubs []*PubSub
68-
for i, h := range hs {
69-
opts := optFn(i, h)
70-
psubs = append(psubs, getGossipsub(ctx, h, opts...))
71-
}
72-
return psubs
73-
}
74-
7566
func TestGossipSubParamsValidate(t *testing.T) {
7667
params := DefaultGossipSubParams()
7768
params.Dhi = 1
@@ -93,6 +84,15 @@ func TestGossipSubBootstrapParamsValidate(t *testing.T) {
9384
}
9485
}
9586

87+
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
88+
var psubs []*PubSub
89+
for i, h := range hs {
90+
opts := optFn(i, h)
91+
psubs = append(psubs, getGossipsub(ctx, h, opts...))
92+
}
93+
return psubs
94+
}
95+
9696
func TestSparseGossipsub(t *testing.T) {
9797
ctx, cancel := context.WithCancel(context.Background())
9898
defer cancel()
@@ -4653,3 +4653,101 @@ outer:
46534653
t.Errorf("Expected no missing parts, got %v", missing)
46544654
}
46554655
}
4656+
4657+
func TestSkipPublishingToPeersWithPartialMessageSupport(t *testing.T) {
4658+
topicName := "test-topic"
4659+
4660+
// 3 hosts.
4661+
// hosts[0]: Publisher. Supports partial messages
4662+
// hosts[1]: Subscriber. Supports partial messages
4663+
// hosts[2]: Alternate publisher. Does not support partial messages. Only
4664+
// connected to hosts[0]
4665+
hosts := getDefaultHosts(t, 3)
4666+
4667+
partialExt := make([]*partialmessages.PartialMessageExtension, 2)
4668+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
4669+
4670+
for i := range partialExt {
4671+
partialExt[i] = &partialmessages.PartialMessageExtension{
4672+
Logger: logger,
4673+
ValidateRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4674+
return nil
4675+
},
4676+
EagerIWantLimitPerHeartbeat: 0,
4677+
IWantLimitPerHeartbeat: 1,
4678+
NewPartialMessage: func(topic string, groupID []byte) (partialmessages.PartialMessage, error) {
4679+
return &minimalTestPartialMessage{
4680+
Group: groupID,
4681+
onExtended: func(m *minimalTestPartialMessage) {
4682+
t.Logf("Received new part and extended partial message")
4683+
},
4684+
}, nil
4685+
},
4686+
}
4687+
}
4688+
4689+
psubs := make([]*PubSub, 0, len(hosts)-1)
4690+
for i, h := range hosts[:2] {
4691+
psub := getGossipsub(context.Background(), h, WithPartialMessagesExtension(partialExt[i]))
4692+
psubs = append(psubs, psub)
4693+
}
4694+
4695+
nonPartialPubsub := getGossipsub(context.Background(), hosts[2])
4696+
4697+
denseConnect(t, hosts[:2])
4698+
time.Sleep(2 * time.Second)
4699+
4700+
// Connect nonPartialPubsub to the publisher
4701+
connect(t, hosts[0], hosts[2])
4702+
4703+
var topics []*Topic
4704+
var subs []*Subscription
4705+
for _, psub := range psubs {
4706+
topic, err := psub.Join(topicName, WithSkipPublishingToPartialMessageCapablePeers())
4707+
if err != nil {
4708+
t.Fatal(err)
4709+
}
4710+
topics = append(topics, topic)
4711+
s, err := topic.Subscribe()
4712+
if err != nil {
4713+
t.Fatal(err)
4714+
}
4715+
subs = append(subs, s)
4716+
}
4717+
4718+
topicForNonPartial, err := nonPartialPubsub.Join(topicName)
4719+
if err != nil {
4720+
t.Fatal(err)
4721+
}
4722+
4723+
// Wait for subscriptions to propagate
4724+
time.Sleep(time.Second)
4725+
4726+
topics[0].Publish(context.Background(), []byte("Hello"))
4727+
4728+
// Publish from another peer, the publisher (psub[0]) should not forward this to psub[1].
4729+
topicForNonPartial.Publish(context.Background(), []byte("from non-partial"))
4730+
4731+
recvdMessage := make(chan struct{}, 1)
4732+
ctx, cancel := context.WithCancel(context.Background())
4733+
defer cancel()
4734+
go func() {
4735+
msg, err := subs[1].Next(ctx)
4736+
if err == context.Canceled {
4737+
return
4738+
}
4739+
if err != nil {
4740+
t.Log(err)
4741+
t.Fail()
4742+
return
4743+
}
4744+
t.Log("Received msg", string(msg.Data))
4745+
recvdMessage <- struct{}{}
4746+
}()
4747+
4748+
select {
4749+
case <-recvdMessage:
4750+
t.Fatal("Received message")
4751+
case <-time.After(2 * time.Second):
4752+
}
4753+
}

pubsub.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
13761376
continue
13771377
}
13781378

1379-
msg := &Message{pmsg, "", rpc.from, nil, false}
1379+
msg := &Message{Message: pmsg, ID: "", ReceivedFrom: rpc.from, ValidatorData: nil, Local: false}
13801380
if p.shouldPush(msg) {
13811381
toPush = append(toPush, msg)
13821382
}
@@ -1515,7 +1515,16 @@ type rmTopicReq struct {
15151515
resp chan error
15161516
}
15171517

1518-
type TopicOptions struct{}
1518+
type TopicOptions struct {
1519+
SkipPublishingToPartialMessageCapablePeers bool
1520+
}
1521+
1522+
func WithSkipPublishingToPartialMessageCapablePeers() TopicOpt {
1523+
return func(t *Topic) error {
1524+
t.skipPublishingToPartialMessageCapablePeers = true
1525+
return nil
1526+
}
1527+
}
15191528

15201529
type TopicOpt func(t *Topic) error
15211530

topic.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Topic struct {
3232

3333
mux sync.RWMutex
3434
closed bool
35+
36+
skipPublishingToPartialMessageCapablePeers bool
3537
}
3638

3739
// String returns the topic associated with t
@@ -348,7 +350,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes
348350
}
349351
}
350352

351-
msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local}
353+
msg := &Message{
354+
Message: m,
355+
ID: "",
356+
ReceivedFrom: t.p.host.ID(),
357+
ValidatorData: pub.validatorData,
358+
Local: pub.local,
359+
}
360+
352361
select {
353362
case t.p.eval <- func() {
354363
t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg})

0 commit comments

Comments
 (0)