Nats.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package Nats
  2. import (
  3. "ColdVerify_server/Nats/NatsServer"
  4. "ColdVerify_server/conf"
  5. "ColdVerify_server/lib"
  6. "ColdVerify_server/logs"
  7. "ColdVerify_server/models/Account"
  8. "ColdVerify_server/models/Certificate"
  9. "ColdVerify_server/models/Device"
  10. "ColdVerify_server/models/System"
  11. "ColdVerify_server/models/Task"
  12. "fmt"
  13. "github.com/nats-io/nats.go"
  14. "github.com/vmihailenco/msgpack/v5"
  15. "strings"
  16. "sync"
  17. )
  18. func init() {
  19. logs.Println("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. fmt.Println("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Println("nats OK!")
  28. // 本地测试,屏蔽本地nats
  29. if !conf.NatsForbidden {
  30. go NatsInit()
  31. }
  32. }
  33. type Extract_TaskData_Back struct {
  34. T_uuid string `xml:"T_uuid"` // 任务主键id
  35. Time_start string `xml:"Time_start"`
  36. Time_end string `xml:"Time_end"`
  37. DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
  38. Task Task.Task `xml:"Task"` // 泛型
  39. }
  40. func NatsInit() {
  41. // 发布-订阅 模式,打包数据
  42. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
  43. logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
  44. var resp Extract_TaskData_Back
  45. err := msgpack.Unmarshal(m.Data, &resp)
  46. if err != nil {
  47. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  48. return
  49. }
  50. Task_r := resp.Task
  51. // 清空表
  52. Task.Truncate_TaskData(Task_r.T_task_id)
  53. //失败重试5次
  54. DeviceClassList := new(sync.Map)
  55. var count int
  56. for _, v := range resp.DeviceClassList {
  57. if strings.Contains(v.T_sn, "-") || len(v.T_sn) == 0 {
  58. // 从3.0平台导入
  59. continue
  60. }
  61. DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
  62. //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  63. //count++
  64. //time.Sleep(5 * time.Second)
  65. }
  66. DeviceClassList.Range(func(k, v interface{}) bool {
  67. count++
  68. return true
  69. })
  70. for count > 0 {
  71. DeviceClassList.Range(func(k, v any) bool {
  72. T_snid := strings.Split(k.(string), "|")
  73. T_sn := T_snid[0]
  74. T_id := T_snid[1]
  75. temp := v.(int)
  76. temp--
  77. DeviceClassList.Store(k, temp)
  78. err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
  79. if err == nil || strings.Contains(err.Error(), "doesn't exist") {
  80. DeviceClassList.Delete(k)
  81. count--
  82. } else {
  83. logs.Error("设备数据同步到任务数据失败", err)
  84. DeviceClassList.Delete(k)
  85. count--
  86. }
  87. return true
  88. })
  89. }
  90. TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
  91. //if TaskData_Num == 0 {
  92. // Task_r.T_collection_num = 0
  93. // if !Task.Update_Task(Task_r, "T_collection_state") {
  94. // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
  95. // }
  96. // return
  97. //}
  98. // 导入到本地数据
  99. NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
  100. System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
  101. System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
  102. })
  103. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
  104. var t_Req Task.Task
  105. var t_R lib.JSONS
  106. err := msgpack.Unmarshal(m.Data, &t_Req)
  107. if err != nil {
  108. logs.Error("Mats", lib.FuncName(), err)
  109. t_R.Code = 202
  110. t_R.Msg = err.Error()
  111. b, _ := msgpack.Marshal(&t_R)
  112. _ = lib.Nats.Publish(m.Reply, b)
  113. return
  114. }
  115. fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
  116. col := []string{}
  117. if t_Req.T_delivery_state > 0 {
  118. col = append(col, "T_delivery_state")
  119. }
  120. if !Task.Update_Task(t_Req, col...) {
  121. logs.Error("Mats", lib.FuncName(), err)
  122. t_R.Code = 202
  123. t_R.Msg = err.Error()
  124. b, _ := msgpack.Marshal(&t_R)
  125. _ = lib.Nats.Publish(m.Reply, b)
  126. return
  127. }
  128. t_R.Code = 200
  129. t_R.Msg = "ok"
  130. b, _ := msgpack.Marshal(&t_R)
  131. _ = lib.Nats.Publish(m.Reply, b)
  132. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  133. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  134. })
  135. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
  136. logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
  137. var t_R lib.JSONS
  138. task, is := Task.Read_Task(string(m.Data))
  139. if !is {
  140. logs.Error("Mats", lib.FuncName())
  141. t_R.Code = 202
  142. t_R.Msg = "查询失败"
  143. b, _ := msgpack.Marshal(&t_R)
  144. _ = lib.Nats.Publish(m.Reply, b)
  145. return
  146. }
  147. t_R.Code = 200
  148. t_R.Msg = "ok"
  149. t_R.Data = task
  150. b, _ := msgpack.Marshal(&t_R)
  151. _ = lib.Nats.Publish(m.Reply, b)
  152. })
  153. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
  154. logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
  155. var t_R lib.JSONS
  156. task, is := Task.Read_Task(string(m.Data))
  157. if !is {
  158. logs.Error("Mats", lib.FuncName())
  159. t_R.Code = 202
  160. t_R.Msg = "查询失败"
  161. b, _ := msgpack.Marshal(&t_R)
  162. _ = lib.Nats.Publish(m.Reply, b)
  163. return
  164. }
  165. List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", 0, 9999)
  166. t_R.Code = 200
  167. t_R.Msg = "ok"
  168. t_R.Data = List
  169. b, _ := msgpack.Marshal(&t_R)
  170. _ = lib.Nats.Publish(m.Reply, b)
  171. })
  172. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
  173. logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
  174. var t_R lib.JSONS
  175. err, user := Account.Read_User_ByT_uuid(string(m.Data))
  176. if err != nil {
  177. logs.Error("Mats", lib.FuncName())
  178. t_R.Code = 202
  179. t_R.Msg = "查询失败"
  180. b, _ := msgpack.Marshal(&t_R)
  181. _ = lib.Nats.Publish(m.Reply, b)
  182. return
  183. }
  184. t_R.Code = 200
  185. t_R.Msg = "ok"
  186. t_R.Data = user
  187. b, _ := msgpack.Marshal(&t_R)
  188. _ = lib.Nats.Publish(m.Reply, b)
  189. })
  190. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) {
  191. type T_Req struct {
  192. T_task_id string `xml:"T_task_id"` // 任务主键id
  193. T_sn string `xml:"T_sn"`
  194. T_id string `xml:"T_id"`
  195. }
  196. var t_Req T_Req
  197. var t_R lib.JSONS
  198. err := msgpack.Unmarshal(m.Data, &t_Req)
  199. if err != nil {
  200. logs.Error("Mats", lib.FuncName(), err)
  201. t_R.Code = 202
  202. t_R.Msg = err.Error()
  203. b, _ := msgpack.Marshal(&t_R)
  204. _ = lib.Nats.Publish(m.Reply, b)
  205. return
  206. }
  207. fmt.Printf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req)
  208. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  209. if !is {
  210. logs.Error("Mats", lib.FuncName())
  211. t_R.Code = 202
  212. t_R.Msg = "T_task_id 错误!"
  213. b, _ := msgpack.Marshal(&t_R)
  214. _ = lib.Nats.Publish(m.Reply, b)
  215. return
  216. }
  217. // 判断是否已存在sn
  218. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  219. // 添加的id和数据库已存在id相同
  220. if is && dc.T_id == t_Req.T_id {
  221. t_R.Code = 200
  222. t_R.Msg = "ok"
  223. b, _ := msgpack.Marshal(&t_R)
  224. _ = lib.Nats.Publish(m.Reply, b)
  225. return
  226. }
  227. var pdf Certificate.CertificatePdf
  228. //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn)
  229. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  230. if len(pdfList) > 0 {
  231. pdf = pdfList[0]
  232. }
  233. // 相同sn 添加的id和数据库已存在id不同
  234. if is && dc.T_id != t_Req.T_id {
  235. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  236. if is {
  237. logs.Error("Mats", lib.FuncName())
  238. t_R.Code = 202
  239. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  240. b, _ := msgpack.Marshal(&t_R)
  241. _ = lib.Nats.Publish(m.Reply, b)
  242. return
  243. }
  244. dc.T_id = t_Req.T_id
  245. dc.T_failure_time = pdf.T_failure_time
  246. dc.T_pdf = pdf.T_pdf
  247. dc.T_Certificate_sn = pdf.T_Certificate_sn
  248. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  249. logs.Error("Mats", lib.FuncName())
  250. t_R.Code = 202
  251. t_R.Msg = "修改编号失败!"
  252. b, _ := msgpack.Marshal(&t_R)
  253. _ = lib.Nats.Publish(m.Reply, b)
  254. return
  255. } else {
  256. t_R.Code = 200
  257. t_R.Msg = "ok"
  258. b, _ := msgpack.Marshal(&t_R)
  259. _ = lib.Nats.Publish(m.Reply, b)
  260. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  261. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  262. return
  263. }
  264. }
  265. var_ := Device.DeviceClassList{
  266. T_class: Task_r.T_class,
  267. T_id: t_Req.T_id,
  268. T_sn: t_Req.T_sn,
  269. T_failure_time: pdf.T_failure_time,
  270. T_pdf: pdf.T_pdf,
  271. T_Certificate_sn: pdf.T_Certificate_sn,
  272. T_remark: "",
  273. T_State: 1,
  274. }
  275. _, is = Device.Add_DeviceClassList(var_)
  276. if !is {
  277. logs.Error("Mats", lib.FuncName(), err)
  278. t_R.Code = 202
  279. t_R.Msg = err.Error()
  280. b, _ := msgpack.Marshal(&t_R)
  281. _ = lib.Nats.Publish(m.Reply, b)
  282. return
  283. }
  284. t_R.Code = 200
  285. t_R.Msg = "ok"
  286. b, _ := msgpack.Marshal(&t_R)
  287. _ = lib.Nats.Publish(m.Reply, b)
  288. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
  289. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  290. })
  291. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) {
  292. type T_Req struct {
  293. T_task_id string `xml:"T_task_id"` // 任务主键id
  294. T_sn string `xml:"T_sn"`
  295. T_id string `xml:"T_id"`
  296. }
  297. var t_Req T_Req
  298. var t_R lib.JSONS
  299. err := msgpack.Unmarshal(m.Data, &t_Req)
  300. if err != nil {
  301. logs.Error("Mats", lib.FuncName(), err)
  302. t_R.Code = 202
  303. t_R.Msg = err.Error()
  304. b, _ := msgpack.Marshal(&t_R)
  305. _ = lib.Nats.Publish(m.Reply, b)
  306. return
  307. }
  308. fmt.Printf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req)
  309. Task_r, is := Task.Read_Task(t_Req.T_task_id)
  310. if !is {
  311. logs.Error("Mats", lib.FuncName())
  312. t_R.Code = 202
  313. t_R.Msg = "T_task_id 错误!"
  314. b, _ := msgpack.Marshal(&t_R)
  315. _ = lib.Nats.Publish(m.Reply, b)
  316. return
  317. }
  318. // 判断是否已存在sn
  319. dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
  320. // 添加的id和数据库已存在id相同
  321. if is && dc.T_id == t_Req.T_id {
  322. t_R.Code = 200
  323. t_R.Msg = "ok"
  324. b, _ := msgpack.Marshal(&t_R)
  325. _ = lib.Nats.Publish(m.Reply, b)
  326. return
  327. }
  328. // 相同sn 添加的id和数据库已存在id不同
  329. if is && dc.T_id != t_Req.T_id {
  330. dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
  331. if is {
  332. logs.Error("Mats", lib.FuncName())
  333. t_R.Code = 202
  334. t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
  335. b, _ := msgpack.Marshal(&t_R)
  336. _ = lib.Nats.Publish(m.Reply, b)
  337. return
  338. }
  339. var pdf Certificate.CertificatePdf
  340. pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
  341. if len(pdfList) > 0 {
  342. pdf = pdfList[0]
  343. }
  344. dc.T_id = t_Req.T_id
  345. dc.T_failure_time = pdf.T_failure_time
  346. dc.T_pdf = pdf.T_pdf
  347. dc.T_Certificate_sn = pdf.T_Certificate_sn
  348. if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
  349. logs.Error("Mats", lib.FuncName())
  350. t_R.Code = 202
  351. t_R.Msg = "修改编号失败!"
  352. b, _ := msgpack.Marshal(&t_R)
  353. _ = lib.Nats.Publish(m.Reply, b)
  354. return
  355. }
  356. }
  357. t_R.Code = 200
  358. t_R.Msg = "ok"
  359. b, _ := msgpack.Marshal(&t_R)
  360. _ = lib.Nats.Publish(m.Reply, b)
  361. Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req)
  362. System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
  363. })
  364. }