MqttClients.go 6.6 KB


  1. package MqttServer
  2. import (
  3. "Cold_mqtt/MqttServer/WarningNotice"
  4. "Cold_mqtt/conf"
  5. "Cold_mqtt/lib"
  6. "Cold_mqtt/logs"
  7. "Cold_mqtt/models/Device"
  8. "Cold_mqtt/models/Warning"
  9. "encoding/json"
  10. "strconv"
  11. "strings"
  12. "time"
  13. )
  14. // 设备连接
  15. func MessageConnected(topicName string, message []byte) {
  16. logs.Println("============= MessageConnected Mqtt JSON =============")
  17. logs.Println("topic:", topicName, " message:", string(message))
  18. //logs.Println("=>", a)
  19. var Ms_project Ms_Project_Connected
  20. err := json.Unmarshal(message, &Ms_project)
  21. if err != nil {
  22. logs.PrintlnError("MqttServer", "JSON反序列化失败[Ms_Project]", string(message))
  23. return
  24. }
  25. Clientid_list := strings.Split(Ms_project.Clientid+"_", "_")
  26. Ms_project.Clientid = Clientid_list[0]
  27. // 过滤
  28. if len(Ms_project.Clientid) < 5 {
  29. return
  30. }
  31. r_Device, err := Device.Read_Device_ByT_sn(Ms_project.Clientid)
  32. if err != nil {
  33. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  34. return
  35. }
  36. logs.Println("Clientid:", Ms_project.Clientid)
  37. logs.Println("Username:", Ms_project.Username)
  38. Reason := ""
  39. if strings.Contains(Ms_project.Username, "_s") {
  40. r_Device.T_online_s = 1
  41. Reason = "备用网络|IP:" + Ms_project.Ipaddress
  42. } else {
  43. r_Device.T_online = 1
  44. Reason = "主网络|IP:" + Ms_project.Ipaddress
  45. }
  46. Warning.Add_DeviceLogs(103, r_Device, Reason)
  47. r_Device.T_mqttid = conf.MqttServer_id
  48. // 同步参数
  49. Device.Update_Device(r_Device, "T_mqttid", "T_online", "T_online_s")
  50. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  51. // 监查 设备参数
  52. go func() {
  53. time.Sleep(time.Second * 10)
  54. logs.Println("上线检测 待设置 设备参数:", r_Device.T_sn)
  55. DeviceParameter_list := Device.Read_DeviceParameter_SN_T_SendState_0(r_Device.T_sn)
  56. if len(DeviceParameter_list) > 0 {
  57. logs.Println("Pu_DeviceParameter->:", DeviceParameter_list[0].Id, DeviceParameter_list[0].T_sn)
  58. Pu_DeviceParameter(DeviceParameter_list[0])
  59. }
  60. logs.Println("上线检测 待设置 传感器参数:", r_Device.T_sn)
  61. DeviceSensorParameter_list := Device.Read_DeviceSensorParameter_SN_T_SendState_0_sql(r_Device.T_sn)
  62. for i, v := range DeviceSensorParameter_list {
  63. sn := v[0].(string)
  64. t_id := v[1].(string)
  65. logs.Println("DeviceSensorParameter_list:", i, sn, t_id)
  66. r_DeviceSensorParameter_list := Device.Read_DeviceSensorParameter_SN_T_id(sn, lib.To_int(t_id))
  67. if len(r_DeviceSensorParameter_list) > 0 {
  68. if r_DeviceSensorParameter_list[0].T_SendState == 0 {
  69. logs.Println("Pu_DeviceParameter_Sensor->:", i, sn, t_id)
  70. Pu_DeviceParameter_Sensor(r_DeviceSensorParameter_list[0])
  71. }
  72. }
  73. }
  74. }()
  75. logs.Println("============= MessageConnected Mqtt JSON AND =============", topicName)
  76. }
  77. func ToReason(str string) (Reason string) {
  78. switch str {
  79. case "normal":
  80. Reason = "客户端主动断开"
  81. break
  82. case "kicked":
  83. Reason = "设备异常掉线,服务端踢出,通过 REST API"
  84. break
  85. case "keepalive_timeout":
  86. Reason = "设备异常掉线,keepalive 超时"
  87. break
  88. case "not_authorized":
  89. Reason = "设备异常掉线,认证失败"
  90. break
  91. case "tcp_closed":
  92. Reason = "设备异常掉线,对端关闭了网络连接"
  93. break
  94. case "internal_error":
  95. Reason = "设备异常掉线,畸形报文或其他未知错误"
  96. break
  97. }
  98. return Reason
  99. }
  100. // 设备断开
  101. func MessageDisconnected(topicName string, message []byte) {
  102. logs.Println("============= MessageDisconnected Mqtt JSON =============")
  103. logs.Println("topic:", topicName, " message:", string(message))
  104. //logs.Println("=>", a)
  105. var Ms_project Ms_Project_Disconnected
  106. err := json.Unmarshal(message, &Ms_project)
  107. if err != nil {
  108. logs.PrintlnError("MqttServer", "JSON反序列化失败[Ms_Project]", string(message))
  109. return
  110. }
  111. logs.Println("Clientid:", Ms_project.Clientid)
  112. logs.Println("Username:", Ms_project.Username)
  113. Clientid_list := strings.Split(Ms_project.Clientid+"_s", "_")
  114. Ms_project.Clientid = Clientid_list[0]
  115. // 过滤
  116. if len(Ms_project.Clientid) < 5 {
  117. return
  118. }
  119. r_Device, err := Device.Read_Device_ByT_sn(Ms_project.Clientid)
  120. if err != nil {
  121. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  122. return
  123. }
  124. Reason := ToReason(Ms_project.Reason)
  125. if strings.Contains(Ms_project.Username, "_s") {
  126. r_Device.T_online_s = 2
  127. Reason = "备用网络|" + Reason
  128. } else {
  129. r_Device.T_online = 2
  130. Reason = "主网络|" + Reason
  131. }
  132. if !strings.Contains(Reason, "主动断开") && r_Device.T_model == "BX100W" {
  133. Reason = "可能市电断电,请注意查看"
  134. }
  135. Warning_r := Warning.Add_DeviceLogs(1001, r_Device, Reason)
  136. Warning_r.T_fUt = time.Now()
  137. if !strings.Contains(Reason, "主动断开") { // 异常断开
  138. //WarningNotice.WarningToAdminId(&Warning_r, []int{}) // 发送给 绑定公司管理员
  139. //// 报警处理
  140. //Warning.Add_Warning_Log(&Warning_r, "====== 报警策略 ===== \n")
  141. WarningNotice.WarningCompanyNotice(&Warning_r, 0, 0)
  142. // 移动端 监控状态同步问题(2小时后自动拉为未监控状态)
  143. if strings.Contains(r_Device.T_model, "BW100") {
  144. go func(sn string) {
  145. time_s := time.Now()
  146. time.Sleep(2 * time.Hour)
  147. r_Device_, err := Device.Read_Device_ByT_sn(sn)
  148. if err != nil {
  149. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  150. return
  151. }
  152. if r_Device_.T_online != 1 {
  153. r_Device_.T_monitor = 0
  154. // 同步参数
  155. Device.Update_Device(r_Device, "T_monitor")
  156. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  157. logs.Println(sn + ",在" + time_s.Format("2006-01-02 15:04:05") + "异常下线, 在两个小时后 未能上线,将其监控状态强制拉为未监控!")
  158. } else {
  159. logs.Println(sn + ",在" + time_s.Format("2006-01-02 15:04:05") + "异常下线, 在两个小时后 成功上线!")
  160. }
  161. }(r_Device.T_sn)
  162. }
  163. } else { // 正常关机
  164. r_Device.T_monitor = 0
  165. if !strings.Contains(r_Device.T_model, "BW100") {
  166. DeviceSensorList := Device.Read_DeviceSensor_ByTsn(r_Device.T_sn)
  167. logs.Println("DeviceSensorList:", DeviceSensorList)
  168. for _, v := range DeviceSensorList {
  169. DeviceData_t, is := Device.RedisDeviceData_Get(r_Device.T_sn+"_Redis|"+strconv.Itoa(v.T_id), v.T_id)
  170. logs.Println("DeviceData_t is:", is)
  171. if is {
  172. Device.Add_DeviceData(r_Device.T_sn, v.T_id, DeviceData_t, true)
  173. Device.RedisDeviceData_Del(r_Device.T_sn+"_Redis", v.T_id) // 清除缓存标志
  174. }
  175. }
  176. }
  177. }
  178. // 同步参数
  179. Device.Update_Device(r_Device, "T_online", "T_online_s", "T_monitor")
  180. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  181. logs.Println("============= MessageDisconnected Mqtt JSON AND =============", topicName)
  182. }