MqttServer.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package MqttServer
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/logs"
  5. "Cold_mqtt/models/Device"
  6. "Cold_mqtt/models/Warning"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/astaxie/beego/cache"
  10. beego "github.com/beego/beego/v2/server/web"
  11. "github.com/yosssi/gmq/mqtt"
  12. "github.com/yosssi/gmq/mqtt/client"
  13. "math"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. cli *client.Client
  21. DeviceSn map[string]Device.Device /*创建集合 */
  22. test = true
  23. Task_Time int64 // 执行时间
  24. )
  25. type DeviceMqttMap_Struct struct {
  26. T_sn string
  27. T_data string
  28. T_num int
  29. T_tonum int
  30. }
  31. var redisCache_Mid cache.Cache
  32. var DeviceMqttMap map[string]DeviceMqttMap_Struct /*创建集合 */
  33. var lock sync.Mutex
  34. func init() {
  35. Task_Time = time.Now().UnixMilli()
  36. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  37. "redisCache_Mid", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  38. logs.Println("", config)
  39. var err error
  40. redisCache_Mid, err = cache.NewCache("redis", config)
  41. if err != nil || redisCache_Mid == nil {
  42. errMsg := "failed to init redis"
  43. logs.Println(errMsg, err)
  44. panic(any(errMsg))
  45. }
  46. //Task_Etection_Time
  47. //Task_Etection_x
  48. DeviceMqttMap = make(map[string]DeviceMqttMap_Struct)
  49. }
  50. // 缓存数据发送-确保设备在休眠后 能收到数据
  51. func DeviceMqttMap_go() {
  52. logs.Println("=====================DeviceMqttMap_go GO===============")
  53. time.Sleep(time.Second * 10)
  54. for true {
  55. for k, v := range DeviceMqttMap {
  56. data, ok := DeviceMqttMapGet(k)
  57. if !ok {
  58. logs.Println("DeviceMqttMapGet:", k, " !!不存在,跳过!! ")
  59. continue
  60. }
  61. v = data
  62. logs.Println("DeviceMqttMap:", k, " T_sn:", v.T_sn, " T_data:", v.T_data, " T_num:", v.T_num, " T_tonum:", v.T_tonum)
  63. if v.T_tonum >= 1 {
  64. Mqtt_publish(v.T_sn, v.T_data)
  65. }
  66. v.T_tonum += 1
  67. lock.Lock()
  68. DeviceMqttMap[k] = v
  69. if v.T_num <= v.T_tonum {
  70. delete(DeviceMqttMap, k)
  71. }
  72. lock.Unlock()
  73. time.Sleep(time.Millisecond * 100)
  74. }
  75. time.Sleep(time.Second * 3)
  76. }
  77. }
  78. func DeviceMqttMapGet(key string) (DeviceMqttMap_Struct, bool) {
  79. lock.Lock()
  80. defer lock.Unlock()
  81. data, ok := DeviceMqttMap[key]
  82. return data, ok
  83. }
  84. func DeviceMqttMapAdd(key string, data DeviceMqttMap_Struct) {
  85. lock.Lock()
  86. defer lock.Unlock()
  87. DeviceMqttMap[key] = data // 缓存数据
  88. logs.Println("DeviceMqttMapAdd:", key, " T_sn:", data.T_sn, " T_data:", data.T_data, " T_num:", data.T_num, " T_tonum:", data.T_tonum)
  89. }
  90. // 获取GetMid缓存
  91. func GetMid(key string, Type int) bool {
  92. if !redisCache_Mid.IsExist(key) {
  93. //redisCache_Mid.Put(key, "", time.Minute*4) // 24-3-26 = 1
  94. if Type == 2 {
  95. redisCache_Mid.Put(key, "", time.Minute*10)
  96. } else {
  97. redisCache_Mid.Put(key, "", time.Minute*1)
  98. }
  99. return false
  100. }
  101. return true
  102. }
  103. func Run_MqttServer() {
  104. time.Sleep(3 * time.Second)
  105. logs.Println("============Run_MqttServer=============", "")
  106. HTTPPort, _ := beego.AppConfig.String("HTTPPort")
  107. // Create an MQTT Client.
  108. cli = client.New(&client.Options{
  109. // Define the processing of the error handler.
  110. ErrorHandler: func(err error) {
  111. logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
  112. time.Sleep(3 * time.Second)
  113. go Run_MqttServer() // MQTT 通讯
  114. return
  115. },
  116. })
  117. // Terminate the Client.
  118. defer cli.Terminate()
  119. c := client.ConnectOptions{
  120. Network: "tcp",
  121. Address: conf.MqttServer_Url,
  122. ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
  123. UserName: []byte(conf.MqttServer_Username),
  124. Password: []byte(conf.MqttServer_Password),
  125. }
  126. logs.Println("Address:", c.Address)
  127. logs.Println("ClientID:", string(c.ClientID))
  128. // Connect to the MQTT Server.
  129. err := cli.Connect(&c)
  130. if err != nil {
  131. logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  132. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  133. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  134. cli.Terminate()
  135. time.Sleep(3 * time.Second)
  136. go Run_MqttServer() // MQTT 通讯
  137. return
  138. }
  139. // Subscribe to topics.
  140. err = cli.Subscribe(&client.SubscribeOptions{
  141. SubReqs: []*client.SubReq{
  142. &client.SubReq{
  143. TopicFilter: []byte("/sub/#"),
  144. QoS: mqtt.QoS0,
  145. // Define the processing of the message handler.
  146. Handler: func(topicName, message []byte) {
  147. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  148. messagePubHandlerV3(string(topicName), message)
  149. },
  150. },
  151. &client.SubReq{ // 设备断开
  152. TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
  153. QoS: mqtt.QoS0,
  154. // Define the processing of the message handler.
  155. Handler: func(topicName, message []byte) {
  156. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  157. MessageDisconnected(string(topicName), message)
  158. },
  159. },
  160. &client.SubReq{ // 设备上线
  161. TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
  162. QoS: mqtt.QoS0,
  163. // Define the processing of the message handler.
  164. Handler: func(topicName, message []byte) {
  165. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  166. MessageConnected(string(topicName), message)
  167. },
  168. },
  169. },
  170. })
  171. if err != nil {
  172. logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
  173. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  174. }
  175. fmt.Println("MQTT ok!")
  176. }
  177. // 冷链 3.0 协议
  178. func messagePubHandlerV3(topicName string, message []byte) {
  179. // 本地测试
  180. logs.Println("=", "============= Mqtt2 JSON =============")
  181. logs.Println("<-"+topicName, string(message))
  182. start := time.Now()
  183. var Ms2_project Ms2m_Project
  184. err := json.Unmarshal(message, &Ms2_project)
  185. if err != nil {
  186. logs.Println("MqttServer", "JSON反序列化失败[Ms_Project]", string(message), err.Error())
  187. return
  188. }
  189. // 过滤
  190. topicNameS := strings.Split(topicName, "/")
  191. if len(topicNameS) != 3 {
  192. logs.Println("MqttServer", "订阅地址错误 len(topicNameS) != 3", strconv.Itoa(len(topicNameS)))
  193. return
  194. }
  195. Sn := topicNameS[2]
  196. if len(Sn) < 4 {
  197. logs.Println("MqttServer", "订阅地址错误 len(Sn) < 4", Sn)
  198. return
  199. }
  200. // 过滤重复数据
  201. SnMid := fmt.Sprintf("%s|%d", Sn, Ms2_project.Msid)
  202. if GetMid(SnMid, Ms2_project.Type) {
  203. logs.Println("MqttServer", "SnMid 重复发送!!", SnMid)
  204. ms2_Return := Ms2_Return{
  205. Type: Ms2_project.Type,
  206. Msid: Ms2_project.Msid,
  207. Status: 1,
  208. }
  209. // 回复
  210. jsonStu, err := json.Marshal(ms2_Return)
  211. if err != nil {
  212. logs.Println("回复失败 [Ms_project_0],err=", err)
  213. return
  214. }
  215. Mqtt_publish(Sn, string(jsonStu))
  216. return
  217. }
  218. r_Device, err := Device.Read_Device_ByT_sn(Sn)
  219. if err != nil {
  220. Device.Add_DeviceSnOld(Sn)
  221. logs.Println("没有该设备:", Sn)
  222. return
  223. }
  224. //误差 10秒
  225. if math.Abs(float64(Ms2_project.Dut-time.Now().Unix())) > 10 && Ms2_project.Dut != 0 {
  226. logs.Println(fmt.Sprintf("%s 时间相差:%d", r_Device.T_sn, int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix())))))
  227. Warning.Add_DeviceLogs(103, r_Device, fmt.Sprintf("时间相差:%d", int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix())))))
  228. RTC_TimeStr(r_Device.T_sn, time.Now().Unix(), 99)
  229. }
  230. logs.Println("1r_Device:", r_Device.T_sn, "T_devName:", r_Device.T_devName, "T_Dattery:", r_Device.T_Dattery, "T_Site:", r_Device.T_Site, "T_monitor:", r_Device.T_monitor, "T_online:", r_Device.T_online, "T_online_s:", r_Device.T_online_s)
  231. //// 更新 状态 为了防止中间有变化,重新获取最新数据
  232. //r_Device_new, err := Device.Read_Device_ByT_sn(r_Device.T_sn)
  233. if r_Device.T_online_s == 0 { // 如果 备用网络没有启动
  234. r_Device.T_online = 1
  235. Device.Update_Device(r_Device, "T_online", "T_online_s")
  236. }
  237. if r_Device.T_online == 0 { // 如果 备用网络没有启动
  238. r_Device.T_online_s = 1
  239. Device.Update_Device(r_Device, "T_online", "T_online_s")
  240. }
  241. if r_Device.T_online != 1 && r_Device.T_online_s != 1 {
  242. r_Device.T_online = 1
  243. Device.Update_Device(r_Device, "T_online", "T_online_s")
  244. }
  245. // 开始出来
  246. AsyncFuncV3(&r_Device, Ms2_project, message) // 开始处理
  247. logs.Println("========= END Mqtt2 JSON========== ", " mid:", Ms2_project.Msid, " RunTime :", time.Since(start))
  248. go func() {
  249. time.Sleep(time.Second)
  250. // 清除数据缓存 数据
  251. logs.Println("清除数据缓存:", r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10))
  252. lock.Lock()
  253. delete(DeviceMqttMap, r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10))
  254. lock.Unlock()
  255. }()
  256. }
  257. // 发送数据
  258. func Mqtt_publish(topic string, text string) {
  259. // Publish a message.
  260. err := cli.Publish(&client.PublishOptions{
  261. QoS: mqtt.QoS0,
  262. TopicName: []byte("/pub/" + topic),
  263. Message: []byte(text),
  264. })
  265. logs.PrintlnMqtt("-> /pub/" + topic + " " + text)
  266. if err != nil {
  267. logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text)
  268. }
  269. }