Nats.go 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Device"
  7. "encoding/xml"
  8. "fmt"
  9. "github.com/nats-io/nats.go"
  10. "strconv"
  11. "strings"
  12. )
  13. func init() {
  14. logs.Println("============Nats init============")
  15. var err error
  16. // 连接Nats服务器
  17. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  18. if err != nil {
  19. fmt.Println("nats 连接失败!")
  20. panic(err)
  21. }
  22. logs.Println("nats OK!")
  23. go NatsInit()
  24. }
  25. func NatsInit() {
  26. // 请求-响应, 响应 test3 消息。
  27. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  28. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  29. type Person_QRCode struct {
  30. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  31. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  32. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  33. EventKey string `xml:"EventKey"`
  34. }
  35. var person_QRCode Person_QRCode
  36. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  37. if err1 != nil {
  38. fmt.Println("Unmarshal error")
  39. _ = lib.Nats.Publish(m.Reply, []byte(""))
  40. return
  41. }
  42. // 进入 二维码配对
  43. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  44. fmt.Println("EventKey-", person_QRCode.EventKey)
  45. // 开始 处理消息
  46. if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
  47. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  48. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  49. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  50. decryptCode_int, err := strconv.Atoi(decryptCode)
  51. fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
  52. R_DeviceClass, err := Device.Read_Class_ById(decryptCode_int)
  53. if err != nil {
  54. _ = lib.Nats.Publish(m.Reply, []byte(""))
  55. return
  56. }
  57. if strings.Contains(R_DeviceClass.T_Notice_wx, person_QRCode.FromUserName) {
  58. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 已绑定,无需重复绑定!"))
  59. return
  60. }
  61. R_DeviceClass.T_Notice_wx = R_DeviceClass.T_Notice_wx + person_QRCode.FromUserName + "/重令名|"
  62. Device.Update_Class_ById(R_DeviceClass)
  63. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 绑定成功!"))
  64. return
  65. }
  66. _ = lib.Nats.Publish(m.Reply, []byte(""))
  67. })
  68. //// 发布-订阅 模式,异步订阅 test1
  69. //_, _ = Nats.Subscribe("test1", func(m *nats.Msg) {
  70. // fmt.Printf("Received a message: %s\n", string(m.Data))
  71. //})
  72. //// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
  73. //_, _ = Nats.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
  74. // fmt.Printf("Queue a message: %s\n", string(msg.Data))
  75. //})
  76. //// 请求-响应, 响应 test3 消息。
  77. //_, _ = Nats.Subscribe("test3", func(m *nats.Msg) {
  78. // fmt.Printf("Reply a message: %s\n", string(m.Data))
  79. // _ = Nats.Publish(m.Reply, []byte("I can help!!"))
  80. //})
  81. }