Nats.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package Nats
  2. import (
  3. "ColdVerify_server/Nats/NatsServer"
  4. "ColdVerify_server/conf"
  5. "ColdVerify_server/lib"
  6. "ColdVerify_server/logs"
  7. "ColdVerify_server/models/Account"
  8. "ColdVerify_server/models/Device"
  9. "ColdVerify_server/models/System"
  10. "ColdVerify_server/models/Task"
  11. "fmt"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strings"
  15. "sync"
  16. )
  17. func init() {
  18. logs.Println("============Nats init============")
  19. var err error
  20. // 连接Nats服务器
  21. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  22. if err != nil {
  23. fmt.Println("nats 连接失败!")
  24. panic(err)
  25. }
  26. logs.Println("nats OK!")
  27. // 本地测试,屏蔽本地nats
  28. if !conf.NatsForbidden {
  29. go NatsInit()
  30. }
  31. }
  32. type Extract_TaskData_Back struct {
  33. T_uuid string `xml:"T_uuid"` // 任务主键id
  34. Time_start string `xml:"Time_start"`
  35. Time_end string `xml:"Time_end"`
  36. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  37. Task Task.Task `xml:"Task"` // 泛型
  38. }
  39. func NatsInit() {
  40. // 发布-订阅 模式,打包数据
  41. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
  42. logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
  43. var resp Extract_TaskData_Back
  44. err := msgpack.Unmarshal(m.Data, &resp)
  45. if err != nil {
  46. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  47. return
  48. }
  49. Task_r := resp.Task
  50. // 清空表
  51. Task.Truncate_TaskData(Task_r.T_task_id)
  52. //失败重试5次
  53. DeviceClassList := new(sync.Map)
  54. var count int
  55. for _, v := range resp.DeviceClassList {
  56. DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
  57. //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  58. //count++
  59. //time.Sleep(5 * time.Second)
  60. }
  61. DeviceClassList.Range(func(k, v interface{}) bool {
  62. count++
  63. return true
  64. })
  65. for count > 0 {
  66. DeviceClassList.Range(func(k, v any) bool {
  67. T_snid := strings.Split(k.(string), "|")
  68. T_sn := T_snid[0]
  69. T_id := T_snid[1]
  70. temp := v.(int)
  71. temp--
  72. DeviceClassList.Store(k, temp)
  73. err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  74. if err == nil || strings.Contains(err.Error(), "doesn't exist") {
  75. DeviceClassList.Delete(k)
  76. count--
  77. }
  78. return true
  79. })
  80. }
  81. TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  82. //if TaskData_Num == 0 {
  83. // Task_r.T_collection_num = 0
  84. // if !Task.Update_Task(Task_r, "T_collection_state") {
  85. // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  86. // }
  87. // return
  88. //}
  89. // 导入到本地数据
  90. NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  91. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  92. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  93. })
  94. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
  95. var t_Req Task.Task
  96. var t_R lib.JSONS
  97. err := msgpack.Unmarshal(m.Data, &t_Req)
  98. if err != nil {
  99. logs.Error("Mats", lib.FuncName(), err)
  100. t_R.Code = 202
  101. t_R.Msg = err.Error()
  102. b, _ := msgpack.Marshal(&t_R)
  103. _ = lib.Nats.Publish(m.Reply, b)
  104. return
  105. }
  106. fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
  107. col := []string{}
  108. if t_Req.T_delivery_state > 0 {
  109. col = append(col, "T_delivery_state")
  110. }
  111. if !Task.Update_Task(t_Req, col...) {
  112. logs.Error("Mats", lib.FuncName(), err)
  113. t_R.Code = 202
  114. t_R.Msg = err.Error()
  115. b, _ := msgpack.Marshal(&t_R)
  116. _ = lib.Nats.Publish(m.Reply, b)
  117. return
  118. }
  119. t_R.Code = 200
  120. t_R.Msg = "ok"
  121. b, _ := msgpack.Marshal(&t_R)
  122. _ = lib.Nats.Publish(m.Reply, b)
  123. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  124. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  125. })
  126. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
  127. logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  128. var t_R lib.JSONS
  129. task, is := Task.Read_Task(string(m.Data))
  130. if !is {
  131. logs.Error("Mats", lib.FuncName())
  132. t_R.Code = 202
  133. t_R.Msg = "查询失败"
  134. b, _ := msgpack.Marshal(&t_R)
  135. _ = lib.Nats.Publish(m.Reply, b)
  136. return
  137. }
  138. t_R.Code = 200
  139. t_R.Msg = "ok"
  140. t_R.Data = task
  141. b, _ := msgpack.Marshal(&t_R)
  142. _ = lib.Nats.Publish(m.Reply, b)
  143. })
  144. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
  145. logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
  146. var t_R lib.JSONS
  147. task, is := Task.Read_Task(string(m.Data))
  148. if !is {
  149. logs.Error("Mats", lib.FuncName())
  150. t_R.Code = 202
  151. t_R.Msg = "查询失败"
  152. b, _ := msgpack.Marshal(&t_R)
  153. _ = lib.Nats.Publish(m.Reply, b)
  154. return
  155. }
  156. List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", 0, 9999)
  157. t_R.Code = 200
  158. t_R.Msg = "ok"
  159. t_R.Data = List
  160. b, _ := msgpack.Marshal(&t_R)
  161. _ = lib.Nats.Publish(m.Reply, b)
  162. })
  163. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
  164. logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
  165. var t_R lib.JSONS
  166. err, user := Account.Read_User_ByT_uuid(string(m.Data))
  167. if err != nil {
  168. logs.Error("Mats", lib.FuncName())
  169. t_R.Code = 202
  170. t_R.Msg = "查询失败"
  171. b, _ := msgpack.Marshal(&t_R)
  172. _ = lib.Nats.Publish(m.Reply, b)
  173. return
  174. }
  175. t_R.Code = 200
  176. t_R.Msg = "ok"
  177. t_R.Data = user
  178. b, _ := msgpack.Marshal(&t_R)
  179. _ = lib.Nats.Publish(m.Reply, b)
  180. })
  181. }