Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,30 +188,17 @@ func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int)

q.lazyInit()

if q.idGen == math.MaxInt64 {
// it's super duper unlikely to wrap around within the retransmit limit
q.idGen = 1
} else {
q.idGen++
}
id := q.idGen

lb := &limitedBroadcast{
transmits: initialTransmits,
msgLen: int64(len(b.Message())),
id: id,
b: b,
}
name := ""
unique := false
if nb, ok := b.(NamedBroadcast); ok {
lb.name = nb.Name()
name = nb.Name()
} else if _, ok := b.(UniqueBroadcast); ok {
unique = true
}

// Check if this message invalidates another.
if lb.name != "" {
if old, ok := q.tm[lb.name]; ok {
if name != "" {
if old, ok := q.tm[name]; ok {
old.b.Finished()
q.deleteItem(old)
}
Expand Down Expand Up @@ -240,8 +227,23 @@ func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int)
}
}

// Generate the id after any invalidated messages are removed from the queue,
// in case q.deleteItem() reset the id generator
if q.idGen == math.MaxInt64 {
// it's super duper unlikely to wrap around within the retransmit limit
q.idGen = 1
} else {
q.idGen++
}

// Append to the relevant queue.
q.addItem(lb)
q.addItem(&limitedBroadcast{
transmits: initialTransmits,
msgLen: int64(len(b.Message())),
id: q.idGen,
b: b,
name: name,
})
}

// deleteItem removes the given item from the overall datastructure. You
Expand Down
2 changes: 2 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func TestLimitedBroadcastLess(t *testing.T) {
func TestTransmitLimited_Queue(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 1, NumNodes: func() int { return 1 }}
q.QueueBroadcast(&memberlistBroadcast{"test", nil, nil})
// Should invalidate and replace the only message in the queue
q.QueueBroadcast(&memberlistBroadcast{"test", nil, nil})
q.QueueBroadcast(&memberlistBroadcast{"foo", nil, nil})
q.QueueBroadcast(&memberlistBroadcast{"bar", nil, nil})

Expand Down