Nats.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package Nats
  2. import (
  3. "ColdVerify_server/Nats/NatsServer"
  4. "ColdVerify_server/conf"
  5. "ColdVerify_server/lib"
  6. "ColdVerify_server/logs"
  7. "ColdVerify_server/models/Device"
  8. "ColdVerify_server/models/System"
  9. "ColdVerify_server/models/Task"
  10. "fmt"
  11. "github.com/nats-io/nats.go"
  12. "github.com/vmihailenco/msgpack/v5"
  13. "sync"
  14. )
  15. func init() {
  16. logs.Println("============Nats init============")
  17. var err error
  18. // 连接Nats服务器
  19. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  20. if err != nil {
  21. fmt.Println("nats 连接失败!")
  22. panic(err)
  23. }
  24. logs.Println("nats OK!")
  25. go NatsInit()
  26. }
  27. type Extract_TaskData_Back struct {
  28. T_uuid string `xml:"T_uuid"` // 任务主键id
  29. Time_start string `xml:"Time_start"`
  30. Time_end string `xml:"Time_end"`
  31. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  32. Task Task.Task `xml:"Task"` // 泛型
  33. }
  34. func NatsInit() {
  35. // 发布-订阅 模式,打包数据
  36. _, _ = lib.Nats.Subscribe("ColdVerify_Server_Extract_TaskData_Back", func(m *nats.Msg) {
  37. logs.Debug("Extract_TaskData_Back message: %v\n", string(m.Data))
  38. var resp Extract_TaskData_Back
  39. err := msgpack.Unmarshal(m.Data, &resp)
  40. if err != nil {
  41. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  42. return
  43. }
  44. Task_r := resp.Task
  45. // 清空表
  46. Task.Truncate_TaskData(Task_r.T_task_id)
  47. DeviceClassList := new(sync.Map)
  48. for _, v := range resp.DeviceClassList {
  49. DeviceClassList.Store(v.Id, v)
  50. }
  51. DeviceClassList.Range(func(k, v any) bool {
  52. c := v.(Device.DeviceClassList)
  53. logs.Println("DeviceClass ---- ", c.T_sn, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  54. err = Task.Import_TaskData_Back(c.T_sn, c.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  55. if err == nil {
  56. DeviceClassList.Delete(k)
  57. }
  58. return true
  59. })
  60. TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  61. if TaskData_Num == 0 {
  62. Task_r.T_collection_state = 0
  63. if !Task.Update_Task(Task_r, "T_collection_state") {
  64. logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  65. }
  66. return
  67. }
  68. // 导入到本地数据
  69. NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  70. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  71. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  72. })
  73. _, _ = lib.Nats.Subscribe("ColdVerify_Server_Update_Task", func(m *nats.Msg) {
  74. var t_Req Task.Task
  75. var t_R lib.JSONS
  76. err := msgpack.Unmarshal(m.Data, &t_Req)
  77. if err != nil {
  78. logs.Error("Mats", lib.FuncName(), err)
  79. t_R.Code = 202
  80. t_R.Msg = err.Error()
  81. b, _ := msgpack.Marshal(&t_R)
  82. _ = lib.Nats.Publish(m.Reply, b)
  83. return
  84. }
  85. fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
  86. col := []string{}
  87. if t_Req.T_delivery_state > 0 {
  88. col = append(col, "T_delivery_state")
  89. }
  90. if !Task.Update_Task(t_Req, "T_collection_state", "T_delivery_state") {
  91. logs.Error("Mats", lib.FuncName(), err)
  92. t_R.Code = 202
  93. t_R.Msg = err.Error()
  94. b, _ := msgpack.Marshal(&t_R)
  95. _ = lib.Nats.Publish(m.Reply, b)
  96. return
  97. }
  98. t_R.Code = 200
  99. t_R.Msg = "ok"
  100. b, _ := msgpack.Marshal(&t_R)
  101. _ = lib.Nats.Publish(m.Reply, b)
  102. })
  103. _, _ = lib.Nats.Subscribe("ColdVerify_Server_Read_Task", func(m *nats.Msg) {
  104. fmt.Printf("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  105. var t_R lib.JSONS
  106. task, is := Task.Read_Task(string(m.Data))
  107. if !is {
  108. logs.Error("Mats", lib.FuncName())
  109. t_R.Code = 202
  110. t_R.Msg = "查询失败"
  111. b, _ := msgpack.Marshal(&t_R)
  112. _ = lib.Nats.Publish(m.Reply, b)
  113. return
  114. }
  115. t_R.Code = 200
  116. t_R.Msg = "ok"
  117. t_R.Data = task
  118. b, _ := msgpack.Marshal(&t_R)
  119. _ = lib.Nats.Publish(m.Reply, b)
  120. })
  121. }