MqttClients.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package MqttServer
  2. import (
  3. "Cold_mqtt/MqttServer/WarningNotice"
  4. "Cold_mqtt/conf"
  5. "Cold_mqtt/lib"
  6. "Cold_mqtt/logs"
  7. "Cold_mqtt/models/Device"
  8. "Cold_mqtt/models/Warning"
  9. "encoding/json"
  10. "strings"
  11. "time"
  12. )
  13. // 设备连接
  14. func MessageConnected(topicName string, message []byte) {
  15. logs.Println("============= MessageConnected Mqtt JSON =============")
  16. logs.Println("topic:", topicName, " message:", string(message))
  17. //logs.Println("=>", a)
  18. var Ms_project Ms_Project_Connected
  19. err := json.Unmarshal(message, &Ms_project)
  20. if err != nil {
  21. logs.PrintlnError("MqttServer", "JSON反序列化失败[Ms_Project]", string(message))
  22. return
  23. }
  24. Clientid_list := strings.Split(Ms_project.Clientid+"_", "_")
  25. Ms_project.Clientid = Clientid_list[0]
  26. // 过滤
  27. if len(Ms_project.Clientid) < 5 {
  28. return
  29. }
  30. r_Device, err := Device.Read_Device_ByT_sn(Ms_project.Clientid)
  31. if err != nil {
  32. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  33. return
  34. }
  35. logs.Println("Clientid:", Ms_project.Clientid)
  36. logs.Println("Username:", Ms_project.Username)
  37. Reason := ""
  38. if strings.Contains(Ms_project.Username, "_s") {
  39. r_Device.T_online_s = 1
  40. Reason = "备用网络|IP:" + Ms_project.Ipaddress
  41. } else {
  42. r_Device.T_online = 1
  43. Reason = "主网络|IP:" + Ms_project.Ipaddress
  44. }
  45. Warning.Add_DeviceLogs(103, r_Device, Reason)
  46. r_Device.T_mqttid = conf.MqttServer_id
  47. // 同步参数
  48. Device.Update_Device(r_Device, "T_mqttid", "T_online", "T_online_s")
  49. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  50. // 监查 设备参数
  51. go func() {
  52. time.Sleep(time.Second * 10)
  53. logs.Println("上线检测 待设置 设备参数:", r_Device.T_sn)
  54. DeviceParameter_list := Device.Read_DeviceParameter_SN_T_SendState_0(r_Device.T_sn)
  55. if len(DeviceParameter_list) > 0 {
  56. logs.Println("Pu_DeviceParameter->:", DeviceParameter_list[0].Id, DeviceParameter_list[0].T_sn)
  57. Pu_DeviceParameter(DeviceParameter_list[0])
  58. }
  59. logs.Println("上线检测 待设置 传感器参数:", r_Device.T_sn)
  60. DeviceSensorParameter_list := Device.Read_DeviceSensorParameter_SN_T_SendState_0_sql(r_Device.T_sn)
  61. for i, v := range DeviceSensorParameter_list {
  62. sn := v[0].(string)
  63. t_id := v[1].(string)
  64. logs.Println("DeviceSensorParameter_list:", i, sn, t_id)
  65. r_DeviceSensorParameter_list := Device.Read_DeviceSensorParameter_SN_T_id(sn, lib.To_int(t_id))
  66. if len(r_DeviceSensorParameter_list) > 0 {
  67. if r_DeviceSensorParameter_list[0].T_SendState == 0 {
  68. logs.Println("Pu_DeviceParameter_Sensor->:", i, sn, t_id)
  69. Pu_DeviceParameter_Sensor(r_DeviceSensorParameter_list[0])
  70. }
  71. }
  72. }
  73. }()
  74. logs.Println("============= MessageConnected Mqtt JSON AND =============", topicName)
  75. }
  76. func ToReason(str string) (Reason string) {
  77. switch str {
  78. case "normal":
  79. Reason = "客户端主动断开"
  80. break
  81. case "kicked":
  82. Reason = "设备异常掉线,服务端踢出,通过 REST API"
  83. break
  84. case "keepalive_timeout":
  85. Reason = "设备异常掉线,keepalive 超时"
  86. break
  87. case "not_authorized":
  88. Reason = "设备异常掉线,认证失败"
  89. break
  90. case "tcp_closed":
  91. Reason = "设备异常掉线,对端关闭了网络连接"
  92. break
  93. case "internal_error":
  94. Reason = "设备异常掉线,畸形报文或其他未知错误"
  95. break
  96. }
  97. return Reason
  98. }
  99. // 设备断开
  100. func MessageDisconnected(topicName string, message []byte) {
  101. logs.Println("============= MessageDisconnected Mqtt JSON =============")
  102. logs.Println("topic:", topicName, " message:", string(message))
  103. //logs.Println("=>", a)
  104. var Ms_project Ms_Project_Disconnected
  105. err := json.Unmarshal(message, &Ms_project)
  106. if err != nil {
  107. logs.PrintlnError("MqttServer", "JSON反序列化失败[Ms_Project]", string(message))
  108. return
  109. }
  110. logs.Println("Clientid:", Ms_project.Clientid)
  111. logs.Println("Username:", Ms_project.Username)
  112. Clientid_list := strings.Split(Ms_project.Clientid+"_s", "_")
  113. Ms_project.Clientid = Clientid_list[0]
  114. // 过滤
  115. if len(Ms_project.Clientid) < 5 {
  116. return
  117. }
  118. r_Device, err := Device.Read_Device_ByT_sn(Ms_project.Clientid)
  119. if err != nil {
  120. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  121. return
  122. }
  123. Reason := ToReason(Ms_project.Reason)
  124. if strings.Contains(Ms_project.Username, "_s") {
  125. r_Device.T_online_s = 2
  126. Reason = "备用网络|" + Reason
  127. } else {
  128. r_Device.T_online = 2
  129. Reason = "主网络|" + Reason
  130. }
  131. if !strings.Contains(Reason, "主动断开") && r_Device.T_model == "BX100W" {
  132. Reason = "可能市电断电,请注意查看"
  133. }
  134. Warning_r := Warning.Add_DeviceLogs(1001, r_Device, Reason)
  135. Warning_r.T_fUt = time.Now()
  136. if !strings.Contains(Reason, "主动断开") { // 异常断开
  137. //WarningNotice.WarningToAdminId(&Warning_r, []int{}) // 发送给 绑定公司管理员
  138. //// 报警处理
  139. //Warning.Add_Warning_Log(&Warning_r, "====== 报警策略 ===== \n")
  140. WarningNotice.WarningCompanyNotice(&Warning_r, 0, 0)
  141. // 移动端 监控状态同步问题(2小时后自动拉为未监控状态)
  142. if strings.Contains(r_Device.T_model, "BW100") {
  143. go func(sn string) {
  144. time_s := time.Now()
  145. time.Sleep(2 * time.Hour)
  146. r_Device_, err := Device.Read_Device_ByT_sn(sn)
  147. if err != nil {
  148. logs.Println("MessageDisconnected 没有该设备:", Ms_project.Clientid)
  149. return
  150. }
  151. if r_Device_.T_online != 1 {
  152. r_Device_.T_monitor = 0
  153. // 同步参数
  154. Device.Update_Device(r_Device, "T_monitor")
  155. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  156. logs.Println(sn + ",在" + time_s.Format("2006-01-02 15:04:05") + "异常下线, 在两个小时后 未能上线,将其监控状态强制拉为未监控!")
  157. } else {
  158. logs.Println(sn + ",在" + time_s.Format("2006-01-02 15:04:05") + "异常下线, 在两个小时后 成功上线!")
  159. }
  160. }(r_Device.T_sn)
  161. }
  162. } else { // 正常关机
  163. r_Device.T_monitor = 0
  164. if !strings.Contains(r_Device.T_model, "BW100") {
  165. DeviceSensorList := Device.Read_DeviceSensor_ByTsn(r_Device.T_sn)
  166. for _, v := range DeviceSensorList {
  167. DeviceData_t, is := Device.RedisDeviceData_Get(r_Device.T_sn+"_Redis", v.T_id)
  168. if is {
  169. Device.Add_DeviceData(r_Device.T_sn, v.T_id, DeviceData_t, true)
  170. Device.RedisDeviceData_Del(r_Device.T_sn+"_Redis", v.T_id) // 清除缓存标志
  171. }
  172. }
  173. }
  174. }
  175. // 同步参数
  176. Device.Update_Device(r_Device, "T_online", "T_online_s", "T_monitor")
  177. Device.Update_Device_To_DeviceSensor(r_Device) // 设备同步参数
  178. logs.Println("============= MessageDisconnected Mqtt JSON AND =============", topicName)
  179. }