项目作者: alex023

项目描述 :
supper pubsub framework based on asynchronous models. provide Topic filter, Fault isolation etc.
高级语言: Go
项目地址: git://github.com/alex023/eventbus.git
创建时间: 2017-05-23T12:49:52Z
项目社区:https://github.com/alex023/eventbus

开源协议:Apache License 2.0

下载


eventbus

中文介绍

License
Go Report Card
GoDoc
Build Status
Coverage Status

Brief

Eventbus is event framework for in-memory event management.

  • Fault isolation
  • Asynchronous event dispatching
  • Filter support:enable log 、intercept、monitor messages.
  • [todo]Multicasting events
  • [todo]State monitor:enable monitor the topic statistic.
  • [todo]Topic matchers:enable a subscriber to subscribe many topics

Just Three Steps

  1. define events
    1. type message struct { /* Additional fields if needed */ }
    We can ignore this step if basic type ,such as string、int、uint etc..
  2. Prepare subscribers: Declare your subscribing method without Mutex for multithreading,
    ```golang
    //we can use any method name to handle event
    func HandleMsg(msg interface{}) {/ Do something /};

//…
func main(){
eventbus.Default.Subscribe(“topic”,HandleMsg)
//…
}

  1. 3. Push events
  2. ```golang
  3. eventbus.Default.Push("topic",this is test msg!")

Can I use it?

The implementation is still in beta, we are using it for our production already. But the API change will be happen until 1.0.

Examples

base function

this code show eventbus basic function:

  1. fault isolation
  2. the subscription object itself is unsubscribed
  3. eventbus stop gracefull
    ```golang
    type CountMessage struct {
    Num int
    }

type Consumer struct {
testNilMap map[int32]struct{}
counter int
sub *eventbus.Subscribe
}

func (c *Consumer) HandleMessage(message interface{}) {
cMsg, ok := message.(CountMessage)
if !ok {
return
}
c.counter++
//When a condition is met, performe the operation which causes the crash
if cMsg.Num == 6 {
c.testNilMap[2] = struct{}{}
} else {
fmt.Printf(“Num:=%v \n”, cMsg.Num)

  1. }
  2. if c.counter > 9 {
  3. fmt.Println("consumer unscribe topic,and cannot receive any message later.")
  4. c.sub.Unscribe()
  5. }

}

//this code show eventbus basic func此代码演示消费者订阅的基本功能:
// 1.fault isolation
// 2.the subscription object itself is unsubscribed
// 3.eventbus stop gracefull
func main() {
var (
eb = eventbus.Default()
topic = “add”
)
consumer := &Consumer{}
sub, _ := eb.Subscribe(consumer.HandleMessage,topic)
consumer.sub = sub

  1. fmt.Println("send 10 messages, Num from 0 to 9")
  2. time.Sleep(time.Second)
  3. for i := 0; i < 10; i++ {
  4. time.Sleep(time.Millisecond * 500)
  5. eb.Publish(topic, CountMessage{i})
  6. }
  7. time.Sleep(time.Second)
  8. fmt.Println("send 10 messages, Num from 10 to 19")
  9. //those messages cannot received by consumer,because it unsubscribe this topic
  10. for i := 10; i < 20; i++ {
  11. time.Sleep(time.Millisecond * 200)
  12. eb.Publish(topic, CountMessage{i})
  13. }
  14. eb.StopGracefull()

}

  1. ### topic message filter
  2. this code show how to use filtertherefor,we can log interceptmonitor etc with filter.
  3. ```golang
  4. type CountMessage struct {
  5. X, Y int
  6. }
  7. //DivisorJudgment implement struct of "Filter"
  8. type DivisorJudgment struct {
  9. }
  10. func (m *DivisorJudgment) Receive(message interface{}) (newMsg interface{}, proceed bool) {
  11. msg, ok := message.(CountMessage)
  12. if !ok {
  13. return nil, false
  14. }
  15. //determines whether the divisor is zero。if it's ,catch it.
  16. if msg.Y == 0 {
  17. fmt.Printf("[Filter] [X/Y]:[%2d/%2d]= ,zero the dividend \n", msg.X, msg.Y)
  18. return nil, false
  19. }
  20. return msg, true
  21. }
  22. type Consumer struct {
  23. testNilMap map[int32]struct{}
  24. }
  25. func (c *Consumer) Div(message interface{}) {
  26. cMsg, ok := message.(CountMessage)
  27. if !ok {
  28. return
  29. }
  30. if cMsg.X == 95 {
  31. c.testNilMap[2] = struct{}{}
  32. } else {
  33. //fmt.Printf("[Consumer] [X/Y]:[%2d/%2d]= %d \n", cMsg.X, cMsg.Y, cMsg.X/cMsg.Y)
  34. _ = cMsg.X / cMsg.Y
  35. }
  36. }
  37. //this code show how can use filter to intercept message!
  38. func main() {
  39. var (
  40. r = rand.New(rand.NewSource(time.Now().UnixNano()))
  41. eb = eventbus.New()
  42. consumer = &Consumer{}
  43. topic = "T"
  44. )
  45. eb.InitTopic(topic, &Watcher{})
  46. eb.Subscribe(consumer.Div, topic)
  47. eb.LoadFilter(topic, &DivisorJudgment{})
  48. fmt.Println("catch zero divisor by filter.")
  49. //0...49,catch zero divisor by [watcher];and 50...99,no watcher.
  50. for i := 0; i < 100; i++ {
  51. eb.Push(topic, CountMessage{i, r.Intn(5)})
  52. time.Sleep(time.Millisecond * 50)
  53. if i == 50 {
  54. fmt.Printf("[topic statics]:%+v \n", eb.Statistic(topic)[0])
  55. fmt.Println("recover collapse by eventbus.")
  56. eb.UnloadFilter(topic, &DivisorJudgment{})
  57. }
  58. }
  59. eb.StopGracefull()
  60. fmt.Printf("[topic statics]:%+v \n", eb.Statistic(topic)[0])
  61. }

more examples