Nats.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package Nats
  2. import (
  3. "Cold_Data/StatisticsTask"
  4. "Cold_Data/conf"
  5. "Cold_Data/lib"
  6. "Cold_Data/logs"
  7. "Cold_Data/models/Account"
  8. "Cold_Data/models/Device"
  9. "encoding/json"
  10. "github.com/nats-io/nats.go"
  11. "strings"
  12. "time"
  13. )
  14. // 实体类
  15. type Ms2m_Project struct {
  16. //Sn string `json:"sn"`
  17. Type int `json:"type"`
  18. Msid int64 `json:"mid"`
  19. Dut int64 `json:"dut"`
  20. }
  21. func NatsInit() {
  22. time.Sleep(time.Minute * 1)
  23. var err error
  24. // 连接Nats服务器
  25. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  26. if err != nil {
  27. logs.Println("nats 连接失败!")
  28. panic(any(err))
  29. }
  30. logs.Println("nats OK!")
  31. // 发布-订阅 模式,异步订阅 test1
  32. _, _ = lib.Nats.Subscribe("Mqtt_DeviceReal", func(m *nats.Msg) {
  33. //logs.Println("Nats Mqtt_DeviceReal: %s\n", string(m.Data))
  34. DataList := strings.Split(string(m.Data), "|+|")
  35. if len(DataList) != 2 {
  36. return
  37. }
  38. if len(DataList[0]) == 0 {
  39. return
  40. }
  41. var Ms2_project Ms2m_Project
  42. err := json.Unmarshal([]byte(DataList[1]), &Ms2_project)
  43. if err != nil {
  44. logs.Println("MqttServer", "JSON反序列化失败[Ms2m_Project]", string(m.Data), err.Error())
  45. return
  46. }
  47. // 只处理报警任务
  48. if Ms2_project.Type != 2 {
  49. return
  50. }
  51. r_Device, err := Device.Read_Device_ByT_sn(DataList[0])
  52. if err != nil {
  53. //logs.Println("没找到SN!", DataList[0])
  54. return
  55. }
  56. // 内部测试
  57. if r_Device.T_pid <= 3 {
  58. return
  59. }
  60. err, Company_r := Account.Read_Company_id(r_Device.T_pid)
  61. if err != nil {
  62. //logs.Println("公司ID 不存在!", r_Device.T_pid)
  63. return
  64. }
  65. StatisticsTask.Company_Handle(Company_r)
  66. })
  67. }