Nats.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package Nats
  2. import (
  3. "ColdVerify_local/Nats/NatsServer"
  4. "ColdVerify_local/conf"
  5. "ColdVerify_local/lib"
  6. "ColdVerify_local/logs"
  7. "ColdVerify_local/models/System"
  8. "ColdVerify_local/models/Task"
  9. "fmt"
  10. "github.com/nats-io/nats.go"
  11. "github.com/vmihailenco/msgpack/v5"
  12. "os"
  13. )
  14. func init() {
  15. logs.Println("============Nats init============")
  16. var err error
  17. // 连接Nats服务器
  18. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  19. if err != nil {
  20. fmt.Println("nats 连接失败!")
  21. panic(err)
  22. }
  23. logs.Println("nats OK!")
  24. go NatsInit()
  25. }
  26. type Up_TaskData_Back struct {
  27. T_uuid string `xml:"T_uuid"`
  28. Task Task.Task `xml:"Task_r"`
  29. }
  30. func NatsInit() {
  31. // 打包本地数据
  32. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Import_TaskData", "Import_TaskData", func(m *nats.Msg) {
  33. logs.Debug("ColdVerify_Local_Import_TaskData message: ", string(m.Data))
  34. logs.Info(lib.FuncName(), "任务数据-打包本地数据 开始打包数据...!")
  35. type T_R struct {
  36. T_uuid string `xml:"T_uuid"`
  37. T_task_id string `xml:"T_task_id"`
  38. TaskData_Num int `xml:"TaskData_Num"`
  39. }
  40. var t_r T_R
  41. err := msgpack.Unmarshal(m.Data, &t_r)
  42. if err != nil {
  43. logs.Error("Nats", "msgpack Unmarshal err", string(m.Data))
  44. return
  45. }
  46. T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
  47. Task_r, err := NatsServer.Read_Task(T_task_id)
  48. if err != nil {
  49. logs.Error(lib.FuncName(), err)
  50. return
  51. }
  52. if t_r.TaskData_Num == 0 {
  53. // 已采集 无数据
  54. Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
  55. //Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  56. Task_r.T_collection_state = 3
  57. err = NatsServer.Update_Task(Task_r)
  58. if err != nil {
  59. logs.Error(lib.FuncName(), err)
  60. }
  61. return
  62. }
  63. if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
  64. logs.Println("创建sql临时文件失败")
  65. }
  66. sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
  67. var org string
  68. //导出线上数据
  69. logs.Info("--------开始导出线上数据---------")
  70. i := 0
  71. for i < 10 {
  72. org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
  73. if err != nil {
  74. logs.Error(lib.FuncName(), "导出线上数据失败", err)
  75. } else {
  76. System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org)
  77. break
  78. }
  79. i++
  80. }
  81. logs.Info("--------开始导入数据到【本地】---------")
  82. j := 0
  83. flag := false
  84. Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
  85. for j < 10 {
  86. Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  87. org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
  88. if err != nil {
  89. logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error())
  90. } else {
  91. if Task.Check_TaskData_Num(T_task_id) {
  92. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org)
  93. flag = true
  94. break
  95. }
  96. }
  97. j++
  98. }
  99. // 重试10次后仍然没有成功导入数据
  100. if !flag {
  101. // 清空本地数据
  102. Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  103. Task_r.T_collection_state = 0
  104. err = NatsServer.Update_Task(Task_r)
  105. if err != nil {
  106. logs.Error(lib.FuncName(), err)
  107. }
  108. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败")
  109. return
  110. }
  111. //删除导出的sql文件
  112. _ = os.Remove(sql_file)
  113. Task_r.T_collection_state = 1
  114. err = NatsServer.Update_Task(Task_r)
  115. if err != nil {
  116. logs.Error(lib.FuncName(), err)
  117. return
  118. }
  119. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功")
  120. logs.Info("--------导入数据到本地【成功】!---------")
  121. })
  122. //_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData2", func(m *nats.Msg) {
  123. // type T_R struct {
  124. // T_uuid string `xml:"T_uuid"`
  125. // T_task_id string `xml:"T_task_id"`
  126. // TaskData_Num int `xml:"TaskData_Num"`
  127. // }
  128. // var t_r T_R
  129. // err := msgpack.Unmarshal(m.Data, &t_r)
  130. // if err != nil {
  131. // System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  132. // return
  133. // }
  134. // logs.Debug("ColdVerify_Local_Import_TaskData message: %+v\n", t_r)
  135. //
  136. // T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
  137. // Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  138. //
  139. // var limit = 50000
  140. // var offset = 0
  141. // var limitMaxNum = 10
  142. // var chData = make(chan int, limitMaxNum)
  143. // var jobGroup sync.WaitGroup
  144. // var tasknum = t_r.TaskData_Num / limit
  145. // if t_r.TaskData_Num%limit != 0 {
  146. // tasknum += 1
  147. // }
  148. //
  149. // //处理任务,最多同时有10个协程
  150. // for i := 0; i < tasknum; i++ {
  151. // chData <- 1
  152. // go func(index int) {
  153. // defer jobGroup.Done()
  154. // jobGroup.Add(1)
  155. // is := Task.Import_TaskData(T_task_id, limit*index, limit)
  156. // if !is {
  157. // logs.Error(lib.FuncName(), "导入数据失败", fmt.Sprintf("%s:%d-%d", T_task_id, offset, limit))
  158. // }
  159. // <-chData
  160. // }(i)
  161. // }
  162. // //使用Wait等待所有任务执行完毕
  163. // jobGroup.Wait()
  164. //
  165. // System.Add_UserLogs(T_uuid, "任务数据-打包本地数据", "从线上导入数据Z_TaskData_"+T_task_id, "")
  166. //
  167. //})
  168. // 更新线上数据
  169. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Up_TaskData", "Up_TaskData", func(m *nats.Msg) {
  170. logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
  171. logs.Info(lib.FuncName(), "任务数据-打包本地数据 更新线上数据...!")
  172. var t_r Up_TaskData_Back
  173. err := msgpack.Unmarshal(m.Data, &t_r)
  174. if err != nil {
  175. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  176. return
  177. }
  178. Task_r := t_r.Task
  179. T_uuid, T_task_id := t_r.T_uuid, Task_r.T_task_id
  180. if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
  181. logs.Println("创建sql临时文件失败")
  182. }
  183. sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
  184. org, err := Task.Dump_TaskData(T_task_id, conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
  185. if err != nil {
  186. logs.Error(lib.FuncName(), "导出本地数据失败")
  187. return
  188. }
  189. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
  190. logs.Info("--------开始导入数据到【线上】---------")
  191. i := 0
  192. flag := false
  193. for i < 10 {
  194. Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
  195. org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
  196. if err != nil {
  197. logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error())
  198. } else {
  199. if Task.Check_TaskData_Num(T_task_id) {
  200. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
  201. flag = true
  202. break
  203. }
  204. }
  205. i++
  206. }
  207. // 重试10次后仍然没有成功导入数据
  208. if !flag {
  209. Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
  210. // 线上数据更新后 将当前任务 交付审核 标志 为 1
  211. Task_r.T_delivery_state = 0
  212. err = NatsServer.Update_Task(Task_r)
  213. if err != nil {
  214. logs.Error(lib.FuncName(), err)
  215. }
  216. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据-失败", "z_task_data_"+T_task_id, "重试10次失败")
  217. return
  218. }
  219. //删除导出的sql文件
  220. _ = os.Remove(sql_file)
  221. // 线上数据更新后 将当前任务 交付审核 标志 为 1
  222. Task_r.T_delivery_state = 1
  223. err = NatsServer.Update_Task(Task_r)
  224. if err != nil {
  225. logs.Error(lib.FuncName(), err)
  226. }
  227. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, "成功")
  228. logs.Info("--------导入数据到【线上】成功!---------")
  229. })
  230. // 创建本地任务表
  231. _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Create_Table", "Create_Table", func(m *nats.Msg) {
  232. logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
  233. logs.Info(lib.FuncName(), "任务数据-创建数据库表 更新线上数据...!")
  234. Task.CREATE_TaskData(conf.Local_AliasName, string(m.Data))
  235. })
  236. }