项目作者: zngw

项目描述 :
kafka的使用
高级语言: Go
项目地址: git://github.com/zngw/kafka.git
创建时间: 2019-12-01T17:21:19Z
项目社区:https://github.com/zngw/kafka

开源协议:

下载


kafka

生产者测试producer.go

  1. package main
  2. import (
  3. "github.com/zngw/kafka"
  4. "github.com/zngw/log"
  5. "os/signal"
  6. "runtime"
  7. "syscall"
  8. )
  9. func main() {
  10. // 初始化日志
  11. err := log.Init(nil)
  12. if err != nil {
  13. panic(err)
  14. }
  15. // 初始化生产生
  16. err = kafka.InitProducer("192.168.1.29:9092")
  17. if err != nil {
  18. panic(err)
  19. }
  20. // 关闭
  21. defer kafka.Close()
  22. // 发送测试消息
  23. kafka.Send("Test","This is Test Msg")
  24. kafka.Send("Test","Hello Guoke")
  25. signal.Ignore(syscall.SIGHUP)
  26. runtime.Goexit()
  27. }

消费者测试consumer.go

  1. package main
  2. import (
  3. "github.com/zngw/kafka"
  4. "github.com/zngw/log"
  5. "os/signal"
  6. "runtime"
  7. "syscall"
  8. )
  9. func main() {
  10. // 初始化日志
  11. err := log.Init(nil)
  12. if err != nil {
  13. panic(err)
  14. }
  15. // 初始化消费者
  16. err = kafka.InitConsumer("192.168.1.29:9092")
  17. if err != nil {
  18. panic(err)
  19. }
  20. // 监听
  21. go func() {
  22. err = kafka.LoopConsumer("Test", TopicCallBack)
  23. if err != nil {
  24. panic(err)
  25. }
  26. }()
  27. signal.Ignore(syscall.SIGHUP)
  28. runtime.Goexit()
  29. }
  30. func TopicCallBack(data []byte) {
  31. log.Trace("kafka", "Test:"+string(data))
  32. }

执行结果