Nats.go 11 KB


  1. package Nats
  2. import (
  3. "ColdVerify_server/Nats/NatsServer"
  4. "ColdVerify_server/conf"
  5. "ColdVerify_server/lib"
  6. "ColdVerify_server/logs"
  7. "ColdVerify_server/models/Account"
  8. "ColdVerify_server/models/Certificate"
  9. "ColdVerify_server/models/Device"
  10. "ColdVerify_server/models/System"
  11. "ColdVerify_server/models/Task"
  12. "fmt"
  13. "github.com/nats-io/nats.go"
  14. "github.com/vmihailenco/msgpack/v5"
  15. "strings"
  16. "sync"
  17. )
  18. func init() {
  19. logs.Println("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. fmt.Println("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Println("nats OK!")
  28. // 本地测试,屏蔽本地nats
  29. if !conf.NatsForbidden {
  30. go NatsInit()
  31. }
  32. }
  33. type Extract_TaskData_Back struct {
  34. T_uuid string `xml:"T_uuid"` // 任务主键id
  35. Time_start string `xml:"Time_start"`
  36. Time_end string `xml:"Time_end"`
  37. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  38. Task Task.Task `xml:"Task"` // 泛型
  39. }
  40. func NatsInit() {
  41. // 发布-订阅 模式,打包数据
  42. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
  43. logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
  44. var resp Extract_TaskData_Back
  45. err := msgpack.Unmarshal(m.Data, &resp)
  46. if err != nil {
  47. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  48. return
  49. }
  50. Task_r := resp.Task
  51. // 清空表
  52. Task.Truncate_TaskData(Task_r.T_task_id)
  53. //失败重试5次
  54. DeviceClassList := new(sync.Map)
  55. var count int
  56. for _, v := range resp.DeviceClassList {
  57. DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
  58. //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  59. //count++
  60. //time.Sleep(5 * time.Second)
  61. }
  62. DeviceClassList.Range(func(k, v interface{}) bool {
  63. count++
  64. return true
  65. })
  66. for count > 0 {
  67. DeviceClassList.Range(func(k, v any) bool {
  68. T_snid := strings.Split(k.(string), "|")
  69. T_sn := T_snid[0]
  70. T_id := T_snid[1]
  71. temp := v.(int)
  72. temp--
  73. DeviceClassList.Store(k, temp)
  74. err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  75. if err == nil || strings.Contains(err.Error(), "doesn't exist") {
  76. DeviceClassList.Delete(k)
  77. count--
  78. }
  79. return true
  80. })
  81. }
  82. TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  83. //if TaskData_Num == 0 {
  84. // Task_r.T_collection_num = 0
  85. // if !Task.Update_Task(Task_r, "T_collection_state") {
  86. // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  87. // }
  88. // return
  89. //}
  90. // 导入到本地数据
  91. NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  92. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  93. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  94. })
  95. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
  96. var t_Req Task.Task
  97. var t_R lib.JSONS
  98. err := msgpack.Unmarshal(m.Data, &t_Req)
  99. if err != nil {
  100. logs.Error("Mats", lib.FuncName(), err)
  101. t_R.Code = 202
  102. t_R.Msg = err.Error()
  103. b, _ := msgpack.Marshal(&t_R)
  104. _ = lib.Nats.Publish(m.Reply, b)
  105. return
  106. }
  107. fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
  108. col := []string{}
  109. if t_Req.T_delivery_state > 0 {
  110. col = append(col, "T_delivery_state")
  111. }
  112. if !Task.Update_Task(t_Req, col...) {
  113. logs.Error("Mats", lib.FuncName(), err)
  114. t_R.Code = 202
  115. t_R.Msg = err.Error()
  116. b, _ := msgpack.Marshal(&t_R)
  117. _ = lib.Nats.Publish(m.Reply, b)
  118. return
  119. }
  120. t_R.Code = 200
  121. t_R.Msg = "ok"
  122. b, _ := msgpack.Marshal(&t_R)
  123. _ = lib.Nats.Publish(m.Reply, b)
  124. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  125. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  126. })
  127. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
  128. logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  129. var t_R lib.JSONS
  130. task, is := Task.Read_Task(string(m.Data))
  131. if !is {
  132. logs.Error("Mats", lib.FuncName())
  133. t_R.Code = 202
  134. t_R.Msg = "查询失败"
  135. b, _ := msgpack.Marshal(&t_R)
  136. _ = lib.Nats.Publish(m.Reply, b)
  137. return
  138. }
  139. t_R.Code = 200
  140. t_R.Msg = "ok"
  141. t_R.Data = task
  142. b, _ := msgpack.Marshal(&t_R)
  143. _ = lib.Nats.Publish(m.Reply, b)
  144. })
  145. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
  146. logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
  147. var t_R lib.JSONS
  148. task, is := Task.Read_Task(string(m.Data))
  149. if !is {
  150. logs.Error("Mats", lib.FuncName())
  151. t_R.Code = 202
  152. t_R.Msg = "查询失败"
  153. b, _ := msgpack.Marshal(&t_R)
  154. _ = lib.Nats.Publish(m.Reply, b)
  155. return
  156. }
  157. List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", 0, 9999)
  158. t_R.Code = 200
  159. t_R.Msg = "ok"
  160. t_R.Data = List
  161. b, _ := msgpack.Marshal(&t_R)
  162. _ = lib.Nats.Publish(m.Reply, b)
  163. })
  164. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
  165. logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
  166. var t_R lib.JSONS
  167. err, user := Account.Read_User_ByT_uuid(string(m.Data))
  168. if err != nil {
  169. logs.Error("Mats", lib.FuncName())
  170. t_R.Code = 202
  171. t_R.Msg = "查询失败"
  172. b, _ := msgpack.Marshal(&t_R)
  173. _ = lib.Nats.Publish(m.Reply, b)
  174. return
  175. }
  176. t_R.Code = 200
  177. t_R.Msg = "ok"
  178. t_R.Data = user
  179. b, _ := msgpack.Marshal(&t_R)
  180. _ = lib.Nats.Publish(m.Reply, b)
  181. })
  182. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) {
  183. type T_Req struct {
  184. T_task_id string `xml:"T_task_id"` // 任务主键id
  185. T_sn string `xml:"T_sn"`
  186. T_id string `xml:"T_id"`
  187. }
  188. var t_Req T_Req
  189. var t_R lib.JSONS
  190. err := msgpack.Unmarshal(m.Data, &t_Req)
  191. if err != nil {
  192. logs.Error("Mats", lib.FuncName(), err)
  193. t_R.Code = 202
  194. t_R.Msg = err.Error()
  195. b, _ := msgpack.Marshal(&t_R)
  196. _ = lib.Nats.Publish(m.Reply, b)
  197. return
  198. }
  199. fmt.Printf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req)
  200. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  201. if !is {
  202. logs.Error("Mats", lib.FuncName())
  203. t_R.Code = 202
  204. t_R.Msg = "T_task_id 错误!"
  205. b, _ := msgpack.Marshal(&t_R)
  206. _ = lib.Nats.Publish(m.Reply, b)
  207. return
  208. }
  209. // 判断是否已存在sn
  210. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  211. // 添加的id和数据库已存在id相同
  212. if is && dc.T_id == t_Req.T_id {
  213. t_R.Code = 200
  214. t_R.Msg = "ok"
  215. b, _ := msgpack.Marshal(&t_R)
  216. _ = lib.Nats.Publish(m.Reply, b)
  217. return
  218. }
  219. // 相同sn 添加的id和数据库已存在id不同
  220. if is && dc.T_id != t_Req.T_id {
  221. _, is = Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  222. if is {
  223. logs.Error("Mats", lib.FuncName())
  224. t_R.Code = 202
  225. t_R.Msg = t_Req.T_id + "编号已存在!"
  226. b, _ := msgpack.Marshal(&t_R)
  227. _ = lib.Nats.Publish(m.Reply, b)
  228. return
  229. }
  230. dc.T_id = t_Req.T_id
  231. if !Device.Update_DeviceClassList(dc, "T_id") {
  232. logs.Error("Mats", lib.FuncName())
  233. t_R.Code = 202
  234. t_R.Msg = "修改编号失败!"
  235. b, _ := msgpack.Marshal(&t_R)
  236. _ = lib.Nats.Publish(m.Reply, b)
  237. return
  238. }
  239. }
  240. var pdf Certificate.CertificatePdf
  241. //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn)
  242. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  243. if len(pdfList) > 0 {
  244. pdf = pdfList[0]
  245. }
  246. var_ := Device.DeviceClassList{
  247. T_class: Task_r.T_class,
  248. T_id: t_Req.T_id,
  249. T_sn: t_Req.T_sn,
  250. T_failure_time: pdf.T_failure_time,
  251. T_pdf: pdf.T_pdf,
  252. T_Certificate_sn: pdf.T_Certificate_sn,
  253. T_remark: "",
  254. T_State: 1,
  255. }
  256. _, is = Device.Add_DeviceClassList(var_)
  257. if !is {
  258. logs.Error("Mats", lib.FuncName(), err)
  259. t_R.Code = 202
  260. t_R.Msg = err.Error()
  261. b, _ := msgpack.Marshal(&t_R)
  262. _ = lib.Nats.Publish(m.Reply, b)
  263. return
  264. }
  265. t_R.Code = 200
  266. t_R.Msg = "ok"
  267. b, _ := msgpack.Marshal(&t_R)
  268. _ = lib.Nats.Publish(m.Reply, b)
  269. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  270. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  271. })
  272. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) {
  273. type T_Req struct {
  274. T_task_id string `xml:"T_task_id"` // 任务主键id
  275. T_sn string `xml:"T_sn"`
  276. T_id string `xml:"T_id"`
  277. }
  278. var t_Req T_Req
  279. var t_R lib.JSONS
  280. err := msgpack.Unmarshal(m.Data, &t_Req)
  281. if err != nil {
  282. logs.Error("Mats", lib.FuncName(), err)
  283. t_R.Code = 202
  284. t_R.Msg = err.Error()
  285. b, _ := msgpack.Marshal(&t_R)
  286. _ = lib.Nats.Publish(m.Reply, b)
  287. return
  288. }
  289. fmt.Printf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req)
  290. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  291. if !is {
  292. logs.Error("Mats", lib.FuncName())
  293. t_R.Code = 202
  294. t_R.Msg = "T_task_id 错误!"
  295. b, _ := msgpack.Marshal(&t_R)
  296. _ = lib.Nats.Publish(m.Reply, b)
  297. return
  298. }
  299. // 判断是否已存在sn
  300. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  301. // 添加的id和数据库已存在id相同
  302. if is && dc.T_id == t_Req.T_id {
  303. t_R.Code = 200
  304. t_R.Msg = "ok"
  305. b, _ := msgpack.Marshal(&t_R)
  306. _ = lib.Nats.Publish(m.Reply, b)
  307. return
  308. }
  309. // 相同sn 添加的id和数据库已存在id不同
  310. if is && dc.T_id != t_Req.T_id {
  311. _, is = Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  312. if is {
  313. logs.Error("Mats", lib.FuncName())
  314. t_R.Code = 202
  315. t_R.Msg = t_Req.T_id + "编号已存在!"
  316. b, _ := msgpack.Marshal(&t_R)
  317. _ = lib.Nats.Publish(m.Reply, b)
  318. return
  319. }
  320. var pdf Certificate.CertificatePdf
  321. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  322. if len(pdfList) > 0 {
  323. pdf = pdfList[0]
  324. }
  325. dc.T_id = t_Req.T_id
  326. dc.T_failure_time = pdf.T_failure_time
  327. dc.T_pdf = pdf.T_pdf
  328. dc.T_Certificate_sn = pdf.T_Certificate_sn
  329. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  330. logs.Error("Mats", lib.FuncName())
  331. t_R.Code = 202
  332. t_R.Msg = "修改编号失败!"
  333. b, _ := msgpack.Marshal(&t_R)
  334. _ = lib.Nats.Publish(m.Reply, b)
  335. return
  336. }
  337. }
  338. t_R.Code = 200
  339. t_R.Msg = "ok"
  340. b, _ := msgpack.Marshal(&t_R)
  341. _ = lib.Nats.Publish(m.Reply, b)
  342. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req)
  343. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  344. })
  345. }