Nats.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package Nats
  2. import (
  3. "Cold_mqtt/MqttServer"
  4. "Cold_mqtt/MqttServer/WarningNotice"
  5. "Cold_mqtt/conf"
  6. "Cold_mqtt/lib"
  7. "Cold_mqtt/logs"
  8. "Cold_mqtt/models/Device"
  9. "Cold_mqtt/models/Product"
  10. "Cold_mqtt/models/Warning"
  11. "github.com/nats-io/nats.go"
  12. "github.com/vmihailenco/msgpack/v5"
  13. "time"
  14. )
  15. func NatsInit() {
  16. time.Sleep(time.Second * 3)
  17. var err error
  18. // 连接Nats服务器
  19. lib.Nats, err = nats.Connect("nats://"+conf.NatsServer_Url, nats.MaxReconnects(10), nats.ReconnectWait(10*time.Second),
  20. nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
  21. // handle disconnect error event
  22. logs.PrintlnError("nats.DisconnectErrHandler 断开 ", err)
  23. }),
  24. nats.ReconnectHandler(func(nc *nats.Conn) {
  25. // handle reconnect event
  26. logs.PrintlnError("nats.ReconnectHandler, 已经重新连接 ")
  27. }))
  28. if err != nil {
  29. logs.Println("nats 连接失败!")
  30. panic(any("nats 连接失败!" + "nats://" + conf.NatsServer_Url))
  31. }
  32. if err != nil {
  33. logs.Println("nats 连接失败!")
  34. panic(any(err))
  35. }
  36. logs.Println("nats OK!")
  37. // 请求-响应, 向 test3 发布一个 `help me` 请求数据,设置超时间3秒,如果有多个响应,只接收第一个收到的消息
  38. //msg, err := lib.Nats.Request("Wx_GenerateQR", []byte("T_uuid"), 3*time.Second)
  39. //if err != nil {
  40. // logs.PrintlnError("Wx_GenerateQR :",err)
  41. //} else {
  42. // logs.Println("Wx_GenerateQR : %s\n", string(msg.Data))
  43. //
  44. //}
  45. // 发布-订阅 模式,异步订阅 test1
  46. _, _ = lib.Nats.Subscribe("Read_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) {
  47. sn := string(m.Data)
  48. logs.Println("Nats Read_DeviceParameter: %s\n", sn)
  49. MqttServer.Read_DeviceParameter(sn)
  50. })
  51. // 发布-订阅 模式,异步订阅 test1
  52. _, _ = lib.Nats.Subscribe("Read_DeviceSensorParameter"+conf.MqttServer_id, func(m *nats.Msg) {
  53. sn := string(m.Data)
  54. logs.Println("Nats Read_DeviceSensorParameter: %s\n", sn)
  55. MqttServer.Read_DeviceSensorParameter(sn)
  56. })
  57. // 发布-订阅 模式,异步订阅 test1
  58. _, _ = lib.Nats.Subscribe("Get_Device_Realtime"+conf.MqttServer_id, func(m *nats.Msg) {
  59. sn := string(m.Data)
  60. logs.Println("Nats Get_Device_Realtime: 【%s】 ", sn)
  61. if len(sn) <= 6 {
  62. return
  63. }
  64. go MqttServer.Get_Device_Realtime(sn)
  65. })
  66. // 发布-订阅 模式,异步订阅 test1
  67. _, _ = lib.Nats.Subscribe("Pu_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) {
  68. logs.Println("Nats Pu_DeviceParameter: %s\n", string(m.Data))
  69. var item Device.DeviceParameter
  70. err = msgpack.Unmarshal(m.Data, &item)
  71. if err != nil {
  72. logs.Println("Pu_DeviceParameter", err)
  73. return
  74. }
  75. MqttServer.Pu_DeviceParameter(item)
  76. })
  77. // 发布-订阅 模式,异步订阅 test1
  78. _, _ = lib.Nats.Subscribe("Pu_DeviceParameter_Sensor"+conf.MqttServer_id, func(m *nats.Msg) {
  79. logs.Println("Nats Pu_DeviceParameter_Sensor: %s\n", string(m.Data))
  80. var item Device.DeviceSensorParameter
  81. err = msgpack.Unmarshal(m.Data, &item)
  82. if err != nil {
  83. logs.Println("Pu_DeviceParameter_Sensor", err)
  84. return
  85. }
  86. MqttServer.Pu_DeviceParameter_Sensor(item)
  87. })
  88. // 发布-订阅 模式,异步订阅 test1
  89. _, _ = lib.Nats.Subscribe("Set_DeviceTask"+conf.MqttServer_id, func(m *nats.Msg) {
  90. logs.Println("Nats Set_DeviceTask: %s\n", string(m.Data))
  91. var item Device.Device_task
  92. err = msgpack.Unmarshal(m.Data, &item)
  93. if err != nil {
  94. logs.Println("Set_DeviceTask", err)
  95. return
  96. }
  97. MqttServer.Set_DeviceTask(item)
  98. })
  99. // 重启/关机 命令 //1:重启,0:关机;
  100. _, _ = lib.Nats.Subscribe("Set_RestartShutdown"+conf.MqttServer_id, func(m *nats.Msg) {
  101. logs.Println("Nats Set_RestartShutdown: %s\n", string(m.Data))
  102. var item Device.Device_task
  103. err = msgpack.Unmarshal(m.Data, &item)
  104. if err != nil {
  105. logs.Println("Set_RestartShutdown", err)
  106. return
  107. }
  108. MqttServer.Set_RestartShutdown(item)
  109. })
  110. // 版本升级
  111. _, _ = lib.Nats.Subscribe("Up_ProductUpgrade"+conf.MqttServer_id, func(m *nats.Msg) {
  112. logs.Println("Nats Up_ProductUpgrade: %s\n", string(m.Data))
  113. var item Product.ProductUpgrade_T
  114. err = msgpack.Unmarshal(m.Data, &item)
  115. if err != nil {
  116. logs.Println("Up_ProductUpgrade", err)
  117. return
  118. }
  119. MqttServer.Up_ProductUpgrade(item)
  120. })
  121. //===========================================================================
  122. // 请求-响应, 响应 test3 消息。
  123. _, _ = lib.Nats.Subscribe("AddWarning"+conf.MqttServer_id, func(m *nats.Msg) {
  124. logs.Println("Nats AddWarning: %s\n", string(m.Data))
  125. type T_Warning struct {
  126. T_tp int `xml:"T_tp"` // 报警类型 ->WarningList
  127. T_sn string `xml:"T_sn"` // 设备序列号
  128. T_D_name string `xml:"T_D_name"` // 设备名称
  129. T_id int `xml:"T_id"` // 传感器 ID
  130. T_DS_name string `xml:"T_DS_name"` // 传感器名称
  131. T_Remark string `xml:"T_Remark"` // 采集内容
  132. T_Ut time.Time `xml:"T_Ut"` // 采集时间
  133. T_ToAdmin []int `xml:"T_ToAdmin"` // 发送给谁 Admin.Id
  134. T_State int `xml:"T_State"` // 0 删除 1 不处理 2 已处理 3 未处理
  135. }
  136. type T_R struct {
  137. Code int16 `xml:"Code"`
  138. Msg string `xml:"Msg"`
  139. }
  140. var t_R T_R
  141. var T_Warning_r T_Warning
  142. err = msgpack.Unmarshal(m.Data, &T_Warning_r)
  143. if err != nil {
  144. println("AddWarning:", err)
  145. return
  146. }
  147. var Warning_r Warning.Warning
  148. Warning_r.T_pid = 0
  149. Warning_r.T_tp = T_Warning_r.T_tp
  150. Warning_r.T_sn = T_Warning_r.T_sn
  151. Warning_r.T_D_name = T_Warning_r.T_D_name
  152. Warning_r.T_id = T_Warning_r.T_id
  153. Warning_r.T_DS_name = T_Warning_r.T_DS_name
  154. Warning_r.T_Remark = T_Warning_r.T_Remark
  155. Warning_r.T_Ut = T_Warning_r.T_Ut
  156. Warning_r.T_State = T_Warning_r.T_State
  157. if Warning_r.T_tp == 1012 {
  158. Warning_r.T_State = 3
  159. }
  160. WarningNotice.WarningToAdminId(&Warning_r, T_Warning_r.T_ToAdmin)
  161. t_R.Code = 200
  162. t_R.Msg = "ok"
  163. // 添加报警
  164. _, err = Warning.Add_Warning(Warning_r)
  165. if err != nil {
  166. t_R.Code = 202
  167. t_R.Msg = "Err"
  168. }
  169. b, _ := msgpack.Marshal(&t_R)
  170. _ = lib.Nats.Publish(m.Reply, b)
  171. })
  172. }