Nats.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. package Nats
  2. import (
  3. "ColdVerify_server/conf"
  4. "ColdVerify_server/lib"
  5. "ColdVerify_server/logs"
  6. "ColdVerify_server/models/Account"
  7. "ColdVerify_server/models/Certificate"
  8. "ColdVerify_server/models/Device"
  9. "ColdVerify_server/models/System"
  10. "ColdVerify_server/models/Task"
  11. "fmt"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strings"
  15. "sync"
  16. )
  17. func init() {
  18. logs.Println("============Nats init============")
  19. var err error
  20. // 连接Nats服务器
  21. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  22. if err != nil {
  23. logs.Error("nats 连接失败!")
  24. panic(any(err))
  25. }
  26. logs.Println("nats OK!")
  27. // 本地测试,屏蔽本地nats
  28. if !conf.NatsForbidden {
  29. go NatsInit()
  30. }
  31. }
  32. type Extract_TaskData_Back struct {
  33. T_uuid string `xml:"T_uuid"` // 任务主键id
  34. Time_start string `xml:"Time_start"`
  35. Time_end string `xml:"Time_end"`
  36. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  37. Task Task.Task `xml:"Task"` // 泛型
  38. }
  39. func NatsInit() {
  40. // 发布-订阅 模式,打包数据
  41. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
  42. logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
  43. var resp Extract_TaskData_Back
  44. err := msgpack.Unmarshal(m.Data, &resp)
  45. if err != nil {
  46. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  47. return
  48. }
  49. Task_r := resp.Task
  50. // 清空表
  51. Task.Truncate_TaskData(Task_r.T_task_id)
  52. //失败重试5次
  53. DeviceClassList := new(sync.Map)
  54. var count int
  55. for _, v := range resp.DeviceClassList {
  56. if strings.Contains(v.T_sn, "-") || len(v.T_sn) == 0 {
  57. // 从3.0平台导入
  58. continue
  59. }
  60. DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
  61. //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  62. //count++
  63. //time.Sleep(5 * time.Second)
  64. }
  65. DeviceClassList.Range(func(k, v interface{}) bool {
  66. count++
  67. return true
  68. })
  69. for count > 0 {
  70. DeviceClassList.Range(func(k, v any) bool {
  71. T_snid := strings.Split(k.(string), "|")
  72. T_sn := T_snid[0]
  73. T_id := T_snid[1]
  74. temp := v.(int)
  75. temp--
  76. DeviceClassList.Store(k, temp)
  77. err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  78. if err == nil || strings.Contains(err.Error(), "doesn't exist") {
  79. DeviceClassList.Delete(k)
  80. count--
  81. } else {
  82. logs.Error("设备数据同步到任务数据失败", err)
  83. DeviceClassList.Delete(k)
  84. count--
  85. }
  86. return true
  87. })
  88. }
  89. //TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  90. //if TaskData_Num == 0 {
  91. // Task_r.T_collection_num = 0
  92. // if !Task.Update_Task(Task_r, "T_collection_state") {
  93. // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  94. // }
  95. // return
  96. //}
  97. // 导入到本地数据---
  98. //NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  99. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  100. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  101. })
  102. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
  103. var t_Req Task.Task
  104. var t_R lib.JSONS
  105. err := msgpack.Unmarshal(m.Data, &t_Req)
  106. if err != nil {
  107. logs.Error("Mats", lib.FuncName(), err)
  108. t_R.Code = 202
  109. t_R.Msg = err.Error()
  110. b, _ := msgpack.Marshal(&t_R)
  111. _ = lib.Nats.Publish(m.Reply, b)
  112. return
  113. }
  114. logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task message: %+v\n", t_Req))
  115. col := []string{}
  116. if t_Req.T_delivery_state > 0 {
  117. col = append(col, "T_delivery_state")
  118. }
  119. if t_Req.T_collection_state > 0 {
  120. col = append(col, "T_collection_state")
  121. }
  122. if !Task.Update_Task(t_Req, col...) {
  123. logs.Error("Nats", lib.FuncName(), err)
  124. t_R.Code = 202
  125. t_R.Msg = err.Error()
  126. b, _ := msgpack.Marshal(&t_R)
  127. _ = lib.Nats.Publish(m.Reply, b)
  128. return
  129. }
  130. t_R.Code = 200
  131. t_R.Msg = "ok"
  132. b, _ := msgpack.Marshal(&t_R)
  133. _ = lib.Nats.Publish(m.Reply, b)
  134. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  135. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  136. })
  137. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
  138. logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  139. var t_R lib.JSONS
  140. task, is := Task.Read_Task(string(m.Data))
  141. if !is {
  142. logs.Error("Mats", lib.FuncName())
  143. t_R.Code = 202
  144. t_R.Msg = "查询失败"
  145. b, _ := msgpack.Marshal(&t_R)
  146. _ = lib.Nats.Publish(m.Reply, b)
  147. return
  148. }
  149. t_R.Code = 200
  150. t_R.Msg = "ok"
  151. t_R.Data = task
  152. b, _ := msgpack.Marshal(&t_R)
  153. _ = lib.Nats.Publish(m.Reply, b)
  154. })
  155. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
  156. logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
  157. var t_R lib.JSONS
  158. task, is := Task.Read_Task(string(m.Data))
  159. if !is {
  160. logs.Error("Mats", lib.FuncName())
  161. t_R.Code = 202
  162. t_R.Msg = "查询失败"
  163. b, _ := msgpack.Marshal(&t_R)
  164. _ = lib.Nats.Publish(m.Reply, b)
  165. return
  166. }
  167. List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
  168. t_R.Code = 200
  169. t_R.Msg = "ok"
  170. t_R.Data = List
  171. b, _ := msgpack.Marshal(&t_R)
  172. _ = lib.Nats.Publish(m.Reply, b)
  173. })
  174. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
  175. logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
  176. var t_R lib.JSONS
  177. err, user := Account.Read_User_ByT_uuid(string(m.Data))
  178. if err != nil {
  179. logs.Error("Mats", lib.FuncName())
  180. t_R.Code = 202
  181. t_R.Msg = "查询失败"
  182. b, _ := msgpack.Marshal(&t_R)
  183. _ = lib.Nats.Publish(m.Reply, b)
  184. return
  185. }
  186. t_R.Code = 200
  187. t_R.Msg = "ok"
  188. t_R.Data = user
  189. b, _ := msgpack.Marshal(&t_R)
  190. _ = lib.Nats.Publish(m.Reply, b)
  191. })
  192. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) {
  193. type T_Req struct {
  194. T_task_id string `xml:"T_task_id"` // 任务主键id
  195. T_sn string `xml:"T_sn"`
  196. T_id string `xml:"T_id"`
  197. }
  198. var t_Req T_Req
  199. var t_R lib.JSONS
  200. err := msgpack.Unmarshal(m.Data, &t_Req)
  201. if err != nil {
  202. logs.Error("Mats", lib.FuncName(), err)
  203. t_R.Code = 202
  204. t_R.Msg = err.Error()
  205. b, _ := msgpack.Marshal(&t_R)
  206. _ = lib.Nats.Publish(m.Reply, b)
  207. return
  208. }
  209. logs.Debug(fmt.Sprintf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req))
  210. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  211. if !is {
  212. logs.Error("Mats", lib.FuncName())
  213. t_R.Code = 202
  214. t_R.Msg = "T_task_id 错误!"
  215. b, _ := msgpack.Marshal(&t_R)
  216. _ = lib.Nats.Publish(m.Reply, b)
  217. return
  218. }
  219. // 判断是否已存在sn
  220. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  221. // 添加的id和数据库已存在id相同
  222. if is && dc.T_id == t_Req.T_id {
  223. t_R.Code = 200
  224. t_R.Msg = "ok"
  225. b, _ := msgpack.Marshal(&t_R)
  226. _ = lib.Nats.Publish(m.Reply, b)
  227. return
  228. }
  229. var pdf Certificate.CertificatePdf
  230. //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn)
  231. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  232. if len(pdfList) > 0 {
  233. pdf = pdfList[0]
  234. }
  235. // 相同sn 添加的id和数据库已存在id不同
  236. if is && dc.T_id != t_Req.T_id {
  237. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  238. if is {
  239. logs.Error("Mats", lib.FuncName())
  240. t_R.Code = 202
  241. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  242. b, _ := msgpack.Marshal(&t_R)
  243. _ = lib.Nats.Publish(m.Reply, b)
  244. return
  245. }
  246. dc.T_id = t_Req.T_id
  247. dc.T_failure_time = pdf.T_failure_time
  248. dc.T_pdf = pdf.T_pdf
  249. dc.T_Certificate_sn = pdf.T_Certificate_sn
  250. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  251. logs.Error("Mats", lib.FuncName())
  252. t_R.Code = 202
  253. t_R.Msg = "修改编号失败!"
  254. b, _ := msgpack.Marshal(&t_R)
  255. _ = lib.Nats.Publish(m.Reply, b)
  256. return
  257. } else {
  258. t_R.Code = 200
  259. t_R.Msg = "ok"
  260. b, _ := msgpack.Marshal(&t_R)
  261. _ = lib.Nats.Publish(m.Reply, b)
  262. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  263. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  264. return
  265. }
  266. }
  267. var_ := Device.DeviceClassList{
  268. T_class: Task_r.T_class,
  269. T_id: t_Req.T_id,
  270. T_sn: t_Req.T_sn,
  271. T_failure_time: pdf.T_failure_time,
  272. T_pdf: pdf.T_pdf,
  273. T_Certificate_sn: pdf.T_Certificate_sn,
  274. T_remark: "",
  275. T_State: 1,
  276. }
  277. _, is = Device.Add_DeviceClassList(var_)
  278. if !is {
  279. logs.Error("Mats", lib.FuncName(), err)
  280. t_R.Code = 202
  281. t_R.Msg = err.Error()
  282. b, _ := msgpack.Marshal(&t_R)
  283. _ = lib.Nats.Publish(m.Reply, b)
  284. return
  285. }
  286. t_R.Code = 200
  287. t_R.Msg = "ok"
  288. b, _ := msgpack.Marshal(&t_R)
  289. _ = lib.Nats.Publish(m.Reply, b)
  290. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  291. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  292. })
  293. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) {
  294. type T_Req struct {
  295. T_task_id string `xml:"T_task_id"` // 任务主键id
  296. T_sn string `xml:"T_sn"`
  297. T_id string `xml:"T_id"`
  298. }
  299. var t_Req T_Req
  300. var t_R lib.JSONS
  301. err := msgpack.Unmarshal(m.Data, &t_Req)
  302. if err != nil {
  303. logs.Error("Mats", lib.FuncName(), err)
  304. t_R.Code = 202
  305. t_R.Msg = err.Error()
  306. b, _ := msgpack.Marshal(&t_R)
  307. _ = lib.Nats.Publish(m.Reply, b)
  308. return
  309. }
  310. logs.Debug(fmt.Sprintf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req))
  311. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  312. if !is {
  313. logs.Error("Mats", lib.FuncName())
  314. t_R.Code = 202
  315. t_R.Msg = "T_task_id 错误!"
  316. b, _ := msgpack.Marshal(&t_R)
  317. _ = lib.Nats.Publish(m.Reply, b)
  318. return
  319. }
  320. // 判断是否已存在sn
  321. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  322. // 添加的id和数据库已存在id相同
  323. if is && dc.T_id == t_Req.T_id {
  324. t_R.Code = 200
  325. t_R.Msg = "ok"
  326. b, _ := msgpack.Marshal(&t_R)
  327. _ = lib.Nats.Publish(m.Reply, b)
  328. return
  329. }
  330. // 相同sn 添加的id和数据库已存在id不同
  331. if is && dc.T_id != t_Req.T_id {
  332. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  333. if is {
  334. logs.Error("Mats", lib.FuncName())
  335. t_R.Code = 202
  336. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  337. b, _ := msgpack.Marshal(&t_R)
  338. _ = lib.Nats.Publish(m.Reply, b)
  339. return
  340. }
  341. var pdf Certificate.CertificatePdf
  342. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  343. if len(pdfList) > 0 {
  344. pdf = pdfList[0]
  345. }
  346. dc.T_id = t_Req.T_id
  347. dc.T_failure_time = pdf.T_failure_time
  348. dc.T_pdf = pdf.T_pdf
  349. dc.T_Certificate_sn = pdf.T_Certificate_sn
  350. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  351. logs.Error("Mats", lib.FuncName())
  352. t_R.Code = 202
  353. t_R.Msg = "修改编号失败!"
  354. b, _ := msgpack.Marshal(&t_R)
  355. _ = lib.Nats.Publish(m.Reply, b)
  356. return
  357. }
  358. }
  359. t_R.Code = 200
  360. t_R.Msg = "ok"
  361. b, _ := msgpack.Marshal(&t_R)
  362. _ = lib.Nats.Publish(m.Reply, b)
  363. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req)
  364. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  365. })
  366. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Del_TaskData_ByT_BindDeviceDataTime", "Del_TaskData_ByT_BindDeviceDataTime", func(m *nats.Msg) {
  367. var t_R lib.JSONS
  368. task, is := Task.Read_Task(string(m.Data))
  369. if !is {
  370. logs.Error("Mats", lib.FuncName())
  371. t_R.Code = 202
  372. t_R.Msg = "查询失败"
  373. b, _ := msgpack.Marshal(&t_R)
  374. _ = lib.Nats.Publish(m.Reply, b)
  375. return
  376. }
  377. dcList, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
  378. for _, v := range dcList {
  379. if !lib.IsNumeric(v.T_id) {
  380. // 删除任务表指定时间数据
  381. Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_BindDeviceDataStartTime, task.T_BindDeviceDataEndTime)
  382. } else {
  383. Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_VerifyDeviceDataStartTime, task.T_VerifyDeviceDataEndTime)
  384. }
  385. }
  386. })
  387. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task_BySN", "Update_Task_BySN", func(m *nats.Msg) {
  388. type T_Req struct {
  389. T_sn string `xml:"T_sn"`
  390. T_CalibrationTime string `xml:"T_CalibrationTime"`
  391. }
  392. var t_Req T_Req
  393. var t_R lib.JSONS
  394. err := msgpack.Unmarshal(m.Data, &t_Req)
  395. if err != nil {
  396. logs.Error("Mats", lib.FuncName(), err)
  397. t_R.Code = 202
  398. t_R.Msg = err.Error()
  399. b, _ := msgpack.Marshal(&t_R)
  400. _ = lib.Nats.Publish(m.Reply, b)
  401. return
  402. }
  403. logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task_BySN message: %+v\n", t_Req))
  404. task, err := Task.Read_Task_BySN(t_Req.T_sn)
  405. if err != nil && err.Error() != "record not found" {
  406. logs.Error("Mats", lib.FuncName())
  407. t_R.Code = 202
  408. t_R.Msg = "查询失败"
  409. b, _ := msgpack.Marshal(&t_R)
  410. _ = lib.Nats.Publish(m.Reply, b)
  411. return
  412. }
  413. col := []string{}
  414. if len(t_Req.T_CalibrationTime) > 0 {
  415. task.T_CalibrationExpirationTime = t_Req.T_CalibrationTime[0:10]
  416. col = append(col, "T_CalibrationExpirationTime")
  417. }
  418. if !Task.Update_Task(task, col...) {
  419. logs.Error("Mats", lib.FuncName(), err)
  420. t_R.Code = 202
  421. t_R.Msg = err.Error()
  422. b, _ := msgpack.Marshal(&t_R)
  423. _ = lib.Nats.Publish(m.Reply, b)
  424. return
  425. }
  426. t_R.Code = 200
  427. t_R.Msg = "ok"
  428. b, _ := msgpack.Marshal(&t_R)
  429. _ = lib.Nats.Publish(m.Reply, b)
  430. Task.Add_TaskLogs_T("nats", task.T_task_id, "任务管理", "修改校准时间", t_Req)
  431. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  432. })
  433. }