MqttHandle.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package MqttServer
  2. import (
  3. "ColdP_server/controllers/WebSocket"
  4. "ColdP_server/controllers/lib"
  5. "ColdP_server/models/Device"
  6. "encoding/json"
  7. "fmt"
  8. "strings"
  9. "time"
  10. )
  11. func AsyncFunc(r_Device Device.Device, Ms_project Ms_Project, msg []byte) {
  12. //t1 := time.Now()
  13. //
  14. //SN := lib.To_string(Ms_project.Sn)
  15. //Type := lib.To_int(Ms_project.Type)
  16. //Msid := lib.To_int(Ms_project.Msid)
  17. //
  18. switch lib.To_int(Ms_project.Type) {
  19. case 3: // 3:设备参数
  20. var Ms_parameter Ms_Parameter
  21. err := json.Unmarshal(msg, &Ms_parameter)
  22. if err != nil {
  23. fmt.Println("JSON反序列化失败[Ms_Parameter],err=", err)
  24. return
  25. }
  26. // 反馈透传
  27. jsonStu_map, err := json.Marshal(Ms_parameter)
  28. if err != nil {
  29. fmt.Println("反馈透传 [Ms_project_0],err=", err)
  30. return
  31. }
  32. _, ok := lib.CountrySnMap[Ms_project.Sn] /*如果确定是真实的,则存在,否则不存在 */
  33. if ok {
  34. for _, v := range lib.CountrySnMap[Ms_project.Sn].Uuid_list {
  35. fmt.Println("转发数据! SN:", Ms_project.Sn, " Uuid:", v)
  36. jsonStu_map_ := strings.Replace(string(jsonStu_map), "\"{", "{", -1)
  37. //jsonStu_map_ =strings.Replace(jsonStu_map_, "}\"", "}", -1)
  38. //jsonStu_map_ =strings.Replace(jsonStu_map_, "\\\"", "\"", -1)
  39. println(jsonStu_map_)
  40. WebSocket.Send_WebSocket(v, jsonStu_map_)
  41. }
  42. }
  43. break
  44. case 4: // 消息反馈
  45. var Ms_parameter_r Ms_Parameter_r
  46. err := json.Unmarshal(msg, &Ms_parameter_r)
  47. if err != nil {
  48. fmt.Println("JSON反序列化失败[Ms_Parameter_r],err=", err)
  49. return
  50. }
  51. // 反馈透传
  52. jsonStu_map, err := json.Marshal(Ms_parameter_r)
  53. if err != nil {
  54. fmt.Println("反馈透传 [Ms_project_0],err=", err)
  55. return
  56. }
  57. _, ok := lib.CountrySnMap[Ms_project.Sn] /*如果确定是真实的,则存在,否则不存在 */
  58. if ok {
  59. for _, v := range lib.CountrySnMap[Ms_project.Sn].Uuid_list {
  60. fmt.Println("转发数据! SN:", Ms_project.Sn, " Uuid:", v)
  61. jsonStu_map_ := strings.Replace(string(jsonStu_map), "\"{", "{", -1)
  62. jsonStu_map_ = strings.Replace(jsonStu_map_, "}\"", "}", -1)
  63. jsonStu_map_ = strings.Replace(jsonStu_map_, "\\\"", "\"", -1)
  64. WebSocket.Send_WebSocket(v, jsonStu_map_)
  65. }
  66. }
  67. break
  68. }
  69. //t2 := time.Now()
  70. //fmt.Println("线程-》 MQTT RunTime:",t2.Sub(t1))
  71. }
  72. // 循环刷新
  73. func Realtime() {
  74. //fmt.Println("=====================Realtime GO===============")
  75. time.Sleep(time.Second * 10)
  76. for true {
  77. //fmt.Println("=====================Realtime GO===============")
  78. //fmt.Println("CountrySnMap_z:", len(lib.CountrySnMap))
  79. for k, _ := range lib.CountrySnMap {
  80. //fmt.Println("CountrySnMap:",k)
  81. Get_Device_Realtime(k)
  82. time.Sleep(time.Millisecond * 100)
  83. }
  84. time.Sleep(time.Second * 10)
  85. }
  86. }
  87. func Get_Device_Realtime(T_sn string) {
  88. // base 读取基本参数
  89. Msid := lib.Random(1, 9999)
  90. Rt_realtime := Rt_Realtime{
  91. Sn: T_sn,
  92. Type: 0,
  93. Msid: Msid,
  94. }
  95. // 回复
  96. jsonStu, err := json.Marshal(Rt_realtime)
  97. if err != nil {
  98. fmt.Println("回复失败 [Pu_DeviceParameter_Compensate],err=", err)
  99. }
  100. fmt.Println(string(jsonStu))
  101. //Mqtt_publish(T_sn, string(jsonStu))
  102. return
  103. }