Nats.go 14 KB


  1. package Nats
  2. import (
  3. "ColdVerify_server/Nats/NatsServer"
  4. "ColdVerify_server/conf"
  5. "ColdVerify_server/lib"
  6. "ColdVerify_server/logs"
  7. "ColdVerify_server/models/Account"
  8. "ColdVerify_server/models/Certificate"
  9. "ColdVerify_server/models/Device"
  10. "ColdVerify_server/models/System"
  11. "ColdVerify_server/models/Task"
  12. "fmt"
  13. "github.com/nats-io/nats.go"
  14. "github.com/vmihailenco/msgpack/v5"
  15. "strings"
  16. "sync"
  17. )
  18. func init() {
  19. logs.Println("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. logs.Error("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Println("nats OK!")
  28. // 本地测试,屏蔽本地nats
  29. if !conf.NatsForbidden {
  30. go NatsInit()
  31. }
  32. }
  33. type Extract_TaskData_Back struct {
  34. T_uuid string `xml:"T_uuid"` // 任务主键id
  35. Time_start string `xml:"Time_start"`
  36. Time_end string `xml:"Time_end"`
  37. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  38. Task Task.Task `xml:"Task"` // 泛型
  39. }
  40. func NatsInit() {
  41. // 发布-订阅 模式,打包数据
  42. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
  43. logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
  44. var resp Extract_TaskData_Back
  45. err := msgpack.Unmarshal(m.Data, &resp)
  46. if err != nil {
  47. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  48. return
  49. }
  50. Task_r := resp.Task
  51. // 清空表
  52. Task.Truncate_TaskData(Task_r.T_task_id)
  53. //失败重试5次
  54. DeviceClassList := new(sync.Map)
  55. var count int
  56. for _, v := range resp.DeviceClassList {
  57. if strings.Contains(v.T_sn, "-") || len(v.T_sn) == 0 {
  58. // 从3.0平台导入
  59. continue
  60. }
  61. DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
  62. //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  63. //count++
  64. //time.Sleep(5 * time.Second)
  65. }
  66. DeviceClassList.Range(func(k, v interface{}) bool {
  67. count++
  68. return true
  69. })
  70. for count > 0 {
  71. DeviceClassList.Range(func(k, v any) bool {
  72. T_snid := strings.Split(k.(string), "|")
  73. T_sn := T_snid[0]
  74. T_id := T_snid[1]
  75. temp := v.(int)
  76. temp--
  77. DeviceClassList.Store(k, temp)
  78. err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  79. if err == nil || strings.Contains(err.Error(), "doesn't exist") {
  80. DeviceClassList.Delete(k)
  81. count--
  82. } else {
  83. logs.Error("设备数据同步到任务数据失败", err)
  84. DeviceClassList.Delete(k)
  85. count--
  86. }
  87. return true
  88. })
  89. }
  90. TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  91. //if TaskData_Num == 0 {
  92. // Task_r.T_collection_num = 0
  93. // if !Task.Update_Task(Task_r, "T_collection_state") {
  94. // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  95. // }
  96. // return
  97. //}
  98. // 导入到本地数据
  99. NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  100. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  101. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  102. })
  103. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
  104. var t_Req Task.Task
  105. var t_R lib.JSONS
  106. err := msgpack.Unmarshal(m.Data, &t_Req)
  107. if err != nil {
  108. logs.Error("Mats", lib.FuncName(), err)
  109. t_R.Code = 202
  110. t_R.Msg = err.Error()
  111. b, _ := msgpack.Marshal(&t_R)
  112. _ = lib.Nats.Publish(m.Reply, b)
  113. return
  114. }
  115. logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task message: %+v\n", t_Req))
  116. col := []string{}
  117. if t_Req.T_delivery_state > 0 {
  118. col = append(col, "T_delivery_state")
  119. }
  120. if t_Req.T_collection_state > 0 {
  121. col = append(col, "T_collection_state")
  122. }
  123. if !Task.Update_Task(t_Req, col...) {
  124. logs.Error("Mats", lib.FuncName(), err)
  125. t_R.Code = 202
  126. t_R.Msg = err.Error()
  127. b, _ := msgpack.Marshal(&t_R)
  128. _ = lib.Nats.Publish(m.Reply, b)
  129. return
  130. }
  131. t_R.Code = 200
  132. t_R.Msg = "ok"
  133. b, _ := msgpack.Marshal(&t_R)
  134. _ = lib.Nats.Publish(m.Reply, b)
  135. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  136. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  137. })
  138. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
  139. logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  140. var t_R lib.JSONS
  141. task, is := Task.Read_Task(string(m.Data))
  142. if !is {
  143. logs.Error("Mats", lib.FuncName())
  144. t_R.Code = 202
  145. t_R.Msg = "查询失败"
  146. b, _ := msgpack.Marshal(&t_R)
  147. _ = lib.Nats.Publish(m.Reply, b)
  148. return
  149. }
  150. t_R.Code = 200
  151. t_R.Msg = "ok"
  152. t_R.Data = task
  153. b, _ := msgpack.Marshal(&t_R)
  154. _ = lib.Nats.Publish(m.Reply, b)
  155. })
  156. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
  157. logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
  158. var t_R lib.JSONS
  159. task, is := Task.Read_Task(string(m.Data))
  160. if !is {
  161. logs.Error("Mats", lib.FuncName())
  162. t_R.Code = 202
  163. t_R.Msg = "查询失败"
  164. b, _ := msgpack.Marshal(&t_R)
  165. _ = lib.Nats.Publish(m.Reply, b)
  166. return
  167. }
  168. List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
  169. t_R.Code = 200
  170. t_R.Msg = "ok"
  171. t_R.Data = List
  172. b, _ := msgpack.Marshal(&t_R)
  173. _ = lib.Nats.Publish(m.Reply, b)
  174. })
  175. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
  176. logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
  177. var t_R lib.JSONS
  178. err, user := Account.Read_User_ByT_uuid(string(m.Data))
  179. if err != nil {
  180. logs.Error("Mats", lib.FuncName())
  181. t_R.Code = 202
  182. t_R.Msg = "查询失败"
  183. b, _ := msgpack.Marshal(&t_R)
  184. _ = lib.Nats.Publish(m.Reply, b)
  185. return
  186. }
  187. t_R.Code = 200
  188. t_R.Msg = "ok"
  189. t_R.Data = user
  190. b, _ := msgpack.Marshal(&t_R)
  191. _ = lib.Nats.Publish(m.Reply, b)
  192. })
  193. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) {
  194. type T_Req struct {
  195. T_task_id string `xml:"T_task_id"` // 任务主键id
  196. T_sn string `xml:"T_sn"`
  197. T_id string `xml:"T_id"`
  198. }
  199. var t_Req T_Req
  200. var t_R lib.JSONS
  201. err := msgpack.Unmarshal(m.Data, &t_Req)
  202. if err != nil {
  203. logs.Error("Mats", lib.FuncName(), err)
  204. t_R.Code = 202
  205. t_R.Msg = err.Error()
  206. b, _ := msgpack.Marshal(&t_R)
  207. _ = lib.Nats.Publish(m.Reply, b)
  208. return
  209. }
  210. logs.Debug(fmt.Sprintf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req))
  211. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  212. if !is {
  213. logs.Error("Mats", lib.FuncName())
  214. t_R.Code = 202
  215. t_R.Msg = "T_task_id 错误!"
  216. b, _ := msgpack.Marshal(&t_R)
  217. _ = lib.Nats.Publish(m.Reply, b)
  218. return
  219. }
  220. // 判断是否已存在sn
  221. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  222. // 添加的id和数据库已存在id相同
  223. if is && dc.T_id == t_Req.T_id {
  224. t_R.Code = 200
  225. t_R.Msg = "ok"
  226. b, _ := msgpack.Marshal(&t_R)
  227. _ = lib.Nats.Publish(m.Reply, b)
  228. return
  229. }
  230. var pdf Certificate.CertificatePdf
  231. //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn)
  232. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  233. if len(pdfList) > 0 {
  234. pdf = pdfList[0]
  235. }
  236. // 相同sn 添加的id和数据库已存在id不同
  237. if is && dc.T_id != t_Req.T_id {
  238. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  239. if is {
  240. logs.Error("Mats", lib.FuncName())
  241. t_R.Code = 202
  242. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  243. b, _ := msgpack.Marshal(&t_R)
  244. _ = lib.Nats.Publish(m.Reply, b)
  245. return
  246. }
  247. dc.T_id = t_Req.T_id
  248. dc.T_failure_time = pdf.T_failure_time
  249. dc.T_pdf = pdf.T_pdf
  250. dc.T_Certificate_sn = pdf.T_Certificate_sn
  251. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  252. logs.Error("Mats", lib.FuncName())
  253. t_R.Code = 202
  254. t_R.Msg = "修改编号失败!"
  255. b, _ := msgpack.Marshal(&t_R)
  256. _ = lib.Nats.Publish(m.Reply, b)
  257. return
  258. } else {
  259. t_R.Code = 200
  260. t_R.Msg = "ok"
  261. b, _ := msgpack.Marshal(&t_R)
  262. _ = lib.Nats.Publish(m.Reply, b)
  263. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  264. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  265. return
  266. }
  267. }
  268. var_ := Device.DeviceClassList{
  269. T_class: Task_r.T_class,
  270. T_id: t_Req.T_id,
  271. T_sn: t_Req.T_sn,
  272. T_failure_time: pdf.T_failure_time,
  273. T_pdf: pdf.T_pdf,
  274. T_Certificate_sn: pdf.T_Certificate_sn,
  275. T_remark: "",
  276. T_State: 1,
  277. }
  278. _, is = Device.Add_DeviceClassList(var_)
  279. if !is {
  280. logs.Error("Mats", lib.FuncName(), err)
  281. t_R.Code = 202
  282. t_R.Msg = err.Error()
  283. b, _ := msgpack.Marshal(&t_R)
  284. _ = lib.Nats.Publish(m.Reply, b)
  285. return
  286. }
  287. t_R.Code = 200
  288. t_R.Msg = "ok"
  289. b, _ := msgpack.Marshal(&t_R)
  290. _ = lib.Nats.Publish(m.Reply, b)
  291. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  292. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  293. })
  294. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) {
  295. type T_Req struct {
  296. T_task_id string `xml:"T_task_id"` // 任务主键id
  297. T_sn string `xml:"T_sn"`
  298. T_id string `xml:"T_id"`
  299. }
  300. var t_Req T_Req
  301. var t_R lib.JSONS
  302. err := msgpack.Unmarshal(m.Data, &t_Req)
  303. if err != nil {
  304. logs.Error("Mats", lib.FuncName(), err)
  305. t_R.Code = 202
  306. t_R.Msg = err.Error()
  307. b, _ := msgpack.Marshal(&t_R)
  308. _ = lib.Nats.Publish(m.Reply, b)
  309. return
  310. }
  311. logs.Debug(fmt.Sprintf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req))
  312. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  313. if !is {
  314. logs.Error("Mats", lib.FuncName())
  315. t_R.Code = 202
  316. t_R.Msg = "T_task_id 错误!"
  317. b, _ := msgpack.Marshal(&t_R)
  318. _ = lib.Nats.Publish(m.Reply, b)
  319. return
  320. }
  321. // 判断是否已存在sn
  322. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  323. // 添加的id和数据库已存在id相同
  324. if is && dc.T_id == t_Req.T_id {
  325. t_R.Code = 200
  326. t_R.Msg = "ok"
  327. b, _ := msgpack.Marshal(&t_R)
  328. _ = lib.Nats.Publish(m.Reply, b)
  329. return
  330. }
  331. // 相同sn 添加的id和数据库已存在id不同
  332. if is && dc.T_id != t_Req.T_id {
  333. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  334. if is {
  335. logs.Error("Mats", lib.FuncName())
  336. t_R.Code = 202
  337. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  338. b, _ := msgpack.Marshal(&t_R)
  339. _ = lib.Nats.Publish(m.Reply, b)
  340. return
  341. }
  342. var pdf Certificate.CertificatePdf
  343. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  344. if len(pdfList) > 0 {
  345. pdf = pdfList[0]
  346. }
  347. dc.T_id = t_Req.T_id
  348. dc.T_failure_time = pdf.T_failure_time
  349. dc.T_pdf = pdf.T_pdf
  350. dc.T_Certificate_sn = pdf.T_Certificate_sn
  351. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  352. logs.Error("Mats", lib.FuncName())
  353. t_R.Code = 202
  354. t_R.Msg = "修改编号失败!"
  355. b, _ := msgpack.Marshal(&t_R)
  356. _ = lib.Nats.Publish(m.Reply, b)
  357. return
  358. }
  359. }
  360. t_R.Code = 200
  361. t_R.Msg = "ok"
  362. b, _ := msgpack.Marshal(&t_R)
  363. _ = lib.Nats.Publish(m.Reply, b)
  364. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req)
  365. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  366. })
  367. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Del_TaskData_ByT_BindDeviceDataTime", "Del_TaskData_ByT_BindDeviceDataTime", func(m *nats.Msg) {
  368. var t_R lib.JSONS
  369. task, is := Task.Read_Task(string(m.Data))
  370. if !is {
  371. logs.Error("Mats", lib.FuncName())
  372. t_R.Code = 202
  373. t_R.Msg = "查询失败"
  374. b, _ := msgpack.Marshal(&t_R)
  375. _ = lib.Nats.Publish(m.Reply, b)
  376. return
  377. }
  378. dcList, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
  379. for _, v := range dcList {
  380. if !lib.IsNumeric(v.T_id) {
  381. // 删除任务表指定时间数据
  382. Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_BindDeviceDataStartTime, task.T_BindDeviceDataEndTime)
  383. } else {
  384. Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_VerifyDeviceDataStartTime, task.T_VerifyDeviceDataEndTime)
  385. }
  386. }
  387. })
  388. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task_BySN", "Update_Task_BySN", func(m *nats.Msg) {
  389. type T_Req struct {
  390. T_sn string `xml:"T_sn"`
  391. T_CalibrationTime string `xml:"T_CalibrationTime"`
  392. }
  393. var t_Req T_Req
  394. var t_R lib.JSONS
  395. err := msgpack.Unmarshal(m.Data, &t_Req)
  396. if err != nil {
  397. logs.Error("Mats", lib.FuncName(), err)
  398. t_R.Code = 202
  399. t_R.Msg = err.Error()
  400. b, _ := msgpack.Marshal(&t_R)
  401. _ = lib.Nats.Publish(m.Reply, b)
  402. return
  403. }
  404. logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task_BySN message: %+v\n", t_Req))
  405. task, err := Task.Read_Task_BySN(t_Req.T_sn)
  406. if err != nil && err.Error() != "record not found" {
  407. logs.Error("Mats", lib.FuncName())
  408. t_R.Code = 202
  409. t_R.Msg = "查询失败"
  410. b, _ := msgpack.Marshal(&t_R)
  411. _ = lib.Nats.Publish(m.Reply, b)
  412. return
  413. }
  414. col := []string{}
  415. if len(t_Req.T_CalibrationTime) > 0 {
  416. task.T_CalibrationExpirationTime = t_Req.T_CalibrationTime[0:10]
  417. col = append(col, "T_CalibrationExpirationTime")
  418. }
  419. if !Task.Update_Task(task, col...) {
  420. logs.Error("Mats", lib.FuncName(), err)
  421. t_R.Code = 202
  422. t_R.Msg = err.Error()
  423. b, _ := msgpack.Marshal(&t_R)
  424. _ = lib.Nats.Publish(m.Reply, b)
  425. return
  426. }
  427. t_R.Code = 200
  428. t_R.Msg = "ok"
  429. b, _ := msgpack.Marshal(&t_R)
  430. _ = lib.Nats.Publish(m.Reply, b)
  431. Task.Add_TaskLogs_T("nats", task.T_task_id, "任务管理", "修改校准时间", t_Req)
  432. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  433. })
  434. }