nsq.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. /*
  2. * @Author: lwnmengjing
  3. * @Date: 2021/5/30 7:30 下午
  4. * @Last Modified by: lwnmengjing
  5. * @Last Modified time: 2021/5/30 7:30 下午
  6. */
  7. package queue
  8. import (
  9. json "github.com/json-iterator/go"
  10. "github.com/nsqio/go-nsq"
  11. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  12. )
  13. // NewNSQ nsq模式 只能监听一个channel
  14. func NewNSQ(addresses []string, cfg *nsq.Config, channelPrefix string) (*NSQ, error) {
  15. n := &NSQ{
  16. addresses: addresses,
  17. cfg: cfg,
  18. channelPrefix: channelPrefix,
  19. }
  20. var err error
  21. n.producer, err = n.newProducer()
  22. return n, err
  23. }
  24. type NSQ struct {
  25. addresses []string
  26. cfg *nsq.Config
  27. producer *nsq.Producer
  28. consumer *nsq.Consumer
  29. channelPrefix string
  30. }
  31. // String 字符串类型
  32. func (NSQ) String() string {
  33. return "nsq"
  34. }
  35. // switchAddress ⚠️生产环境至少配置三个节点
  36. func (e *NSQ) switchAddress() {
  37. if len(e.addresses) > 1 {
  38. e.addresses[0], e.addresses[len(e.addresses)-1] =
  39. e.addresses[1],
  40. e.addresses[0]
  41. }
  42. }
  43. func (e *NSQ) newProducer() (*nsq.Producer, error) {
  44. if e.cfg == nil {
  45. e.cfg = nsq.NewConfig()
  46. }
  47. return nsq.NewProducer(e.addresses[0], e.cfg)
  48. }
  49. func (e *NSQ) newConsumer(topic string, h nsq.Handler) (err error) {
  50. if e.cfg == nil {
  51. e.cfg = nsq.NewConfig()
  52. }
  53. if e.consumer == nil {
  54. e.consumer, err = nsq.NewConsumer(topic, e.channelPrefix+topic, e.cfg)
  55. if err != nil {
  56. return err
  57. }
  58. }
  59. e.consumer.AddHandler(h)
  60. err = e.consumer.ConnectToNSQDs(e.addresses)
  61. return err
  62. }
  63. // Append 消息入生产者
  64. func (e *NSQ) Append(message storage.Messager) error {
  65. rb, err := json.Marshal(message.GetValues())
  66. if err != nil {
  67. return err
  68. }
  69. return e.producer.Publish(message.GetStream(), rb)
  70. }
  71. // Register 监听消费者
  72. func (e *NSQ) Register(name string, f storage.ConsumerFunc) {
  73. h := &nsqConsumerHandler{f}
  74. err := e.newConsumer(name, h)
  75. if err != nil {
  76. //目前不支持动态注册
  77. panic(err)
  78. }
  79. }
  80. func (e *NSQ) Run() {
  81. }
  82. func (e *NSQ) Shutdown() {
  83. if e.producer != nil {
  84. e.producer.Stop()
  85. }
  86. if e.consumer != nil {
  87. e.consumer.Stop()
  88. }
  89. }
  90. type nsqConsumerHandler struct {
  91. f storage.ConsumerFunc
  92. }
  93. func (e nsqConsumerHandler) HandleMessage(message *nsq.Message) error {
  94. m := new(Message)
  95. data := make(map[string]interface{})
  96. err := json.Unmarshal(message.Body, &data)
  97. if err != nil {
  98. return err
  99. }
  100. m.SetValues(data)
  101. return e.f(m)
  102. }