Nats.go 7.7 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Device"
  7. "encoding/xml"
  8. "fmt"
  9. "github.com/astaxie/beego/cache"
  10. "github.com/nats-io/nats.go"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. var redisCache_NatsServer cache.Cache
  16. func init() {
  17. logs.Println("============Nats init============")
  18. var err error
  19. // 连接Nats服务器
  20. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  21. if err != nil {
  22. fmt.Println("nats 连接失败!")
  23. panic(err)
  24. }
  25. logs.Println("nats OK!")
  26. go NatsInit()
  27. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  28. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  29. fmt.Println(config)
  30. redisCache_NatsServer, err = cache.NewCache("redis", config)
  31. if err != nil || redisCache_NatsServer == nil {
  32. errMsg := "failed to init redis"
  33. fmt.Println(errMsg, err)
  34. panic(errMsg)
  35. }
  36. }
  37. func NatsInit() {
  38. // 请求-响应, 响应 test3 消息。
  39. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  40. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  41. type Person_QRCode struct {
  42. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  43. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  44. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  45. EventKey string `xml:"EventKey"`
  46. }
  47. var person_QRCode Person_QRCode
  48. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  49. if err1 != nil {
  50. fmt.Println("Unmarshal error")
  51. _ = lib.Nats.Publish(m.Reply, []byte(""))
  52. return
  53. }
  54. // 进入 二维码配对
  55. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  56. fmt.Println("EventKey-", person_QRCode.EventKey)
  57. // 开始 处理消息
  58. if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
  59. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  60. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  61. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  62. decryptCode_int, err := strconv.Atoi(decryptCode)
  63. fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
  64. R_DeviceClass, err := Device.Read_Class_ById(decryptCode_int)
  65. if err != nil {
  66. _ = lib.Nats.Publish(m.Reply, []byte(""))
  67. return
  68. }
  69. if strings.Contains(R_DeviceClass.T_Notice_wx, person_QRCode.FromUserName) {
  70. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 已绑定,无需重复绑定!"))
  71. return
  72. }
  73. R_DeviceClass.T_Notice_wx = R_DeviceClass.T_Notice_wx + person_QRCode.FromUserName + "/重令名|"
  74. Device.Update_Class_ById(R_DeviceClass)
  75. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 绑定成功!"))
  76. return
  77. }
  78. _ = lib.Nats.Publish(m.Reply, []byte(""))
  79. })
  80. // 请求-响应, 响应 test3 消息。
  81. _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  82. fmt.Printf(" => Nats Wx2_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  83. type Person_QRCode struct {
  84. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  85. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  86. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  87. EventKey string `xml:"EventKey"`
  88. }
  89. var person_QRCode Person_QRCode
  90. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  91. if err1 != nil {
  92. fmt.Println("Unmarshal error")
  93. _ = lib.Nats.Publish(m.Reply, []byte(""))
  94. return
  95. }
  96. // 进入 二维码配对
  97. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  98. fmt.Println("EventKey-", person_QRCode.EventKey)
  99. // 开始 处理消息
  100. if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
  101. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  102. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  103. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  104. decryptCode_int, err := strconv.Atoi(decryptCode)
  105. fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
  106. R_DeviceClass, err := Device.Read_Class_ById(decryptCode_int)
  107. if err != nil {
  108. _ = lib.Nats.Publish(m.Reply, []byte(""))
  109. return
  110. }
  111. if strings.Contains(R_DeviceClass.T_Notice_wx2, person_QRCode.FromUserName+"/重令名|") {
  112. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 重复扫码!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
  113. return
  114. }
  115. if strings.Contains(R_DeviceClass.T_Notice_wx2, person_QRCode.FromUserName) {
  116. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 已绑定,无需重复绑定!"))
  117. return
  118. }
  119. R_DeviceClass.T_Notice_wx2 = R_DeviceClass.T_Notice_wx2 + person_QRCode.FromUserName + "/重令名|"
  120. Device.Update_Class_ById(R_DeviceClass)
  121. redisCache_NatsServer.Put(person_QRCode.FromUserName, decryptCode_int, 5*time.Minute)
  122. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 扫码成功!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
  123. return
  124. }
  125. _ = lib.Nats.Publish(m.Reply, []byte(""))
  126. })
  127. // 请求-响应, 响应 test3 消息。
  128. _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Text", func(m *nats.Msg) {
  129. logs.Println(" => Nats Wx2_BasicMessage_Text message: %s\n", string(m.Data))
  130. type Person_Text struct {
  131. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  132. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  133. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  134. MsgType string `xml:"MsgType"`
  135. Content string `xml:"Content"`
  136. }
  137. var person_Text Person_Text
  138. err1 := xml.Unmarshal(m.Data, &person_Text)
  139. if err1 != nil {
  140. fmt.Println("Unmarshal error")
  141. _ = lib.Nats.Publish(m.Reply, []byte(""))
  142. return
  143. }
  144. // 进入 二维码配对
  145. fmt.Println("FromUserName-", person_Text.FromUserName)
  146. fmt.Println("Content-", person_Text.Content)
  147. if len(person_Text.Content) > 8*3 || len(person_Text.Content) <= 1*3 {
  148. _ = lib.Nats.Publish(m.Reply, []byte("请正确输入您的名字!"))
  149. return
  150. }
  151. if !redisCache_NatsServer.IsExist(person_Text.FromUserName) {
  152. return
  153. }
  154. Class_ById := lib.To_int(redisCache_NatsServer.Get(person_Text.FromUserName))
  155. R_DeviceClass, err := Device.Read_Class_ById(Class_ById)
  156. if err != nil {
  157. _ = lib.Nats.Publish(m.Reply, []byte(""))
  158. return
  159. }
  160. R_DeviceClass.T_Notice_wx2 = strings.Replace(R_DeviceClass.T_Notice_wx2,
  161. person_Text.FromUserName+"/重令名|",
  162. person_Text.FromUserName+"/"+person_Text.Content+"|",
  163. -1)
  164. Device.Update_Class_ById(R_DeviceClass)
  165. // 删除 缓存
  166. redisCache_NatsServer.Delete(person_Text.FromUserName)
  167. _ = lib.Nats.Publish(m.Reply, []byte("尊敬的 "+person_Text.Content+",您以成功绑定 "+R_DeviceClass.T_name))
  168. })
  169. //// 发布-订阅 模式,异步订阅 test1
  170. //_, _ = Nats.Subscribe("test1", func(m *nats.Msg) {
  171. // fmt.Printf("Received a message: %s\n", string(m.Data))
  172. //})
  173. //// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
  174. //_, _ = Nats.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
  175. // fmt.Printf("Queue a message: %s\n", string(msg.Data))
  176. //})
  177. //// 请求-响应, 响应 test3 消息。
  178. //_, _ = Nats.Subscribe("test3", func(m *nats.Msg) {
  179. // fmt.Printf("Reply a message: %s\n", string(m.Data))
  180. // _ = Nats.Publish(m.Reply, []byte("I can help!!"))
  181. //})
  182. }