Skip to content

Commit afc1af9

Browse files
authored
Merge pull request #20 from donutloop/feat/bus
- Added: Implementation of bus
2 parents 0375f81 + 7558e24 commit afc1af9

File tree

3 files changed

+315
-0
lines changed

3 files changed

+315
-0
lines changed

bus/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Usage
2+
3+
Bus it is a simple but powerful publish-subscribe event system. It requires object to
4+
register themselves with the event bus to receive events.
5+
6+
## Example
7+
```go
8+
package main
9+
10+
import (
11+
"github.com/donutloop/toolkit/bus"
12+
"log"
13+
)
14+
15+
type msg struct {
16+
Id int64
17+
counter int
18+
}
19+
20+
func main() {
21+
b := bus.New()
22+
23+
b.AddEventListener(func(m *msg) error {
24+
m.counter++
25+
return nil
26+
})
27+
28+
b.AddEventListener(func(m *msg) error {
29+
m.counter++
30+
return nil
31+
})
32+
33+
b.Dispatch(new(msg))
34+
}
35+
```

bus/bus.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package bus
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"sync"
7+
)
8+
9+
//The type of the function's first and only argument
10+
//declares the msg to listen for.
11+
type HandlerFunc interface{}
12+
13+
type Msg interface{}
14+
15+
//It is a simple but powerful publish-subscribe event system. It requires object to
16+
//register themselves with the event bus to receive events.
17+
type Bus interface {
18+
Dispatch(msg Msg) error
19+
AddHandler(handler HandlerFunc) error
20+
AddEventListener(handler HandlerFunc)
21+
Publish(msg Msg) error
22+
}
23+
24+
type InProcBus struct {
25+
sync.Mutex
26+
handlers map[string]reflect.Value
27+
listeners map[string][]reflect.Value
28+
}
29+
30+
func New() Bus {
31+
return &InProcBus{
32+
handlers: make(map[string]reflect.Value),
33+
listeners: make(map[string][]reflect.Value),
34+
}
35+
}
36+
37+
// Dispatch sends an msg to registered handler that were declared
38+
// to accept values of a msg
39+
func (b *InProcBus) Dispatch(msg Msg) error {
40+
nameOfMsg := reflect.TypeOf(msg)
41+
42+
handler, ok := b.handlers[nameOfMsg.String()]
43+
if !ok {
44+
return fmt.Errorf("handler not found for %s", nameOfMsg)
45+
}
46+
47+
params := make([]reflect.Value, 0, 1)
48+
params = append(params, reflect.ValueOf(msg))
49+
50+
ret := handler.Call(params)
51+
v := ret[0].Interface()
52+
if err, ok := v.(error); ok && err != nil {
53+
return err
54+
}
55+
return nil
56+
}
57+
58+
// Dispatch sends an msg to all registered listeners that were declared
59+
// to accept values of a msg
60+
func (b *InProcBus) Publish(msg Msg) error {
61+
nameOfMsg := reflect.TypeOf(msg)
62+
listeners := b.listeners[nameOfMsg.String()]
63+
64+
params := make([]reflect.Value, 0, 1)
65+
params = append(params, reflect.ValueOf(msg))
66+
67+
for _, listenerHandler := range listeners {
68+
ret := listenerHandler.Call(params)
69+
v := ret[0].Interface()
70+
if err, ok := v.(error); ok && err != nil {
71+
return err
72+
}
73+
}
74+
return nil
75+
}
76+
77+
// AddHandler registers a handler function that will be called when a matching
78+
// msg is dispatched.
79+
func (b *InProcBus) AddHandler(handler HandlerFunc) error {
80+
b.Mutex.Lock()
81+
defer b.Mutex.Unlock()
82+
83+
handlerType := reflect.TypeOf(handler)
84+
validateHandlerFunc(handlerType)
85+
86+
typeOfMsg := handlerType.In(0)
87+
if _, ok := b.handlers[typeOfMsg.String()]; ok {
88+
return fmt.Errorf("handler exists for %s", typeOfMsg)
89+
}
90+
91+
b.handlers[typeOfMsg.String()] = reflect.ValueOf(handler)
92+
return nil
93+
}
94+
95+
// AddListener registers a listener function that will be called when a matching
96+
// msg is dispatched.
97+
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
98+
b.Mutex.Lock()
99+
defer b.Mutex.Unlock()
100+
101+
handlerType := reflect.TypeOf(handler)
102+
validateHandlerFunc(handlerType)
103+
// the first input parameter is the msg
104+
typOfMsg := handlerType.In(0)
105+
_, ok := b.listeners[typOfMsg.String()]
106+
if !ok {
107+
b.listeners[typOfMsg.String()] = make([]reflect.Value, 0)
108+
}
109+
b.listeners[typOfMsg.String()] = append(b.listeners[typOfMsg.String()], reflect.ValueOf(handler))
110+
}
111+
112+
// panic if conditions not met (this is a programming error)
113+
func validateHandlerFunc(handlerType reflect.Type) {
114+
switch {
115+
case handlerType.Kind() != reflect.Func:
116+
panic(BadFuncError("handler func must be a function"))
117+
case handlerType.NumIn() != 1:
118+
panic(BadFuncError("handler func must take exactly one input argument"))
119+
case handlerType.NumOut() != 1:
120+
panic(BadFuncError("handler func must take exactly one output argument"))
121+
}
122+
}
123+
124+
// BadFuncError is raised via panic() when AddEventListener is called with an
125+
// invalid listener function.
126+
type BadFuncError string
127+
128+
func (bhf BadFuncError) Error() string {
129+
return fmt.Sprintf("bad handler func: %s", string(bhf))
130+
}

bus/bus_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package bus_test
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/donutloop/toolkit/bus"
7+
"testing"
8+
)
9+
10+
type msg struct {
11+
Id int64
12+
body string
13+
}
14+
15+
func TestHandlerReturnsError(t *testing.T) {
16+
b := bus.New()
17+
18+
b.AddHandler(func(m *msg) error {
19+
return errors.New("handler error")
20+
})
21+
22+
err := b.Dispatch(new(msg))
23+
if err == nil {
24+
t.Fatal("Send query failed " + err.Error())
25+
}
26+
}
27+
28+
func TestHandlerReturn(t *testing.T) {
29+
b := bus.New()
30+
31+
b.AddHandler(func(q *msg) error {
32+
q.body = "Hello, world!"
33+
return nil
34+
})
35+
36+
msg := new(msg)
37+
err := b.Dispatch(msg)
38+
39+
if err != nil {
40+
t.Fatal("Send msg failed " + err.Error())
41+
}
42+
43+
if msg.body != "Hello, world!" {
44+
t.Fatal("Failed to get response from handler")
45+
}
46+
}
47+
48+
func TestEventListeners(t *testing.T) {
49+
b := bus.New()
50+
count := 0
51+
52+
b.AddEventListener(func(m *msg) error {
53+
count += 1
54+
return nil
55+
})
56+
57+
b.AddEventListener(func(m *msg) error {
58+
count += 10
59+
return nil
60+
})
61+
62+
err := b.Publish(new(msg))
63+
if err != nil {
64+
t.Fatal("Publish msg failed " + err.Error())
65+
}
66+
67+
if count != 11 {
68+
t.Fatal(fmt.Sprintf("Publish msg failed, listeners called: %v, expected: %v", count, 11))
69+
}
70+
}
71+
72+
func TestAddHandlerBadFunc(t *testing.T) {
73+
defer func() {
74+
if v := recover(); v != nil {
75+
_, ok := v.(bus.BadFuncError)
76+
if !ok {
77+
t.Fatalf("unexpected object (%v)", v)
78+
}
79+
}
80+
}()
81+
82+
b := bus.New()
83+
b.AddHandler(func(q *msg, s string) error {
84+
return nil
85+
})
86+
}
87+
88+
func TestAddListenerBadFunc(t *testing.T) {
89+
defer func() {
90+
if v := recover(); v != nil {
91+
_, ok := v.(bus.BadFuncError)
92+
if !ok {
93+
t.Fatalf("unexpected object (%v)", v)
94+
}
95+
}
96+
}()
97+
98+
b := bus.New()
99+
b.AddEventListener(func(q *msg, s string) error {
100+
return nil
101+
})
102+
}
103+
104+
func BenchmarkRun(b *testing.B) {
105+
for n := 0; n < b.N; n++ {
106+
b := bus.New()
107+
108+
b.AddEventListener(func(m *msg) error {
109+
return nil
110+
})
111+
112+
b.AddEventListener(func(m *msg) error {
113+
return nil
114+
})
115+
116+
b.AddEventListener(func(m *msg) error {
117+
return nil
118+
})
119+
120+
b.AddEventListener(func(m *msg) error {
121+
return nil
122+
})
123+
124+
b.AddEventListener(func(m *msg) error {
125+
return nil
126+
})
127+
128+
b.AddHandler(func(q *msg) error {
129+
return nil
130+
})
131+
132+
b.AddHandler(func(q *msg) error {
133+
return nil
134+
})
135+
136+
b.AddHandler(func(q *msg) error {
137+
return nil
138+
})
139+
140+
b.AddHandler(func(q *msg) error {
141+
return nil
142+
})
143+
144+
b.AddHandler(func(q *msg) error {
145+
return nil
146+
})
147+
148+
b.Dispatch(new(msg))
149+
}
150+
}

0 commit comments

Comments
 (0)