Skip to content

Commit 03809ae

Browse files
committed
add Broadcaster
1 parent 723348c commit 03809ae

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Experimental packages not in std and golang.org/exp
2525
- Uint32
2626
- Uint64
2727
- **Pool**: a generic pool, forked from https://github.com/mkmik/syncpool
28+
- **Broadcaster**: a broadcaster to broadcast message to multiple receivers
2829

2930
- **container**
3031
- **heap**: generic heap

sync/broadcaster.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package sync
2+
3+
import "sync"
4+
5+
type Broadcaster[T any] struct {
6+
mu *sync.Mutex
7+
cond *sync.Cond
8+
signaled bool
9+
10+
v T
11+
}
12+
13+
// Broadcast sends a signal with a value to all goroutines waiting on the broadcaster.
14+
func NewBroadcaster[T any]() *Broadcaster[T] {
15+
var mu sync.Mutex
16+
return &Broadcaster[T]{
17+
mu: &mu,
18+
cond: sync.NewCond(&mu),
19+
signaled: false,
20+
}
21+
}
22+
23+
// Go waits until something is broadcasted, and runs the given function in a new
24+
// goroutine with the value that was broadcasted.
25+
func (b *Broadcaster[T]) Go(fn func(v T)) {
26+
go func() {
27+
b.cond.L.Lock()
28+
defer b.cond.L.Unlock()
29+
30+
for !b.signaled {
31+
b.cond.Wait()
32+
}
33+
fn(b.v)
34+
}()
35+
}
36+
37+
// Broadcast broadcasts a signal to all
38+
// waiting function and unblocks them.
39+
func (b *Broadcaster[T]) Broadcast(v T) {
40+
b.cond.L.Lock()
41+
b.v = v
42+
b.signaled = true
43+
b.cond.L.Unlock()
44+
45+
b.cond.Broadcast()
46+
}

sync/broadcaster_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package sync
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestNewBroadcaster(t *testing.T) {
12+
b := NewBroadcaster[int]()
13+
14+
var count atomic.Int32
15+
var sum atomic.Int64
16+
17+
var wg sync.WaitGroup
18+
19+
for i := 0; i < 10; i++ {
20+
wg.Add(1)
21+
b.Go(func(v int) {
22+
defer wg.Done()
23+
24+
count.Add(1)
25+
sum.Add(int64(v))
26+
})
27+
}
28+
29+
b.Broadcast(10)
30+
31+
wg.Wait()
32+
33+
assert.Equal(t, int32(10), count.Load())
34+
assert.Equal(t, int64(100), sum.Load())
35+
36+
}

0 commit comments

Comments
 (0)