Nats.go 7.7 KB


  1. package Nats
  2. import (
  3. "ColdVerify_local/Nats/NatsServer"
  4. "ColdVerify_local/conf"
  5. "ColdVerify_local/lib"
  6. "ColdVerify_local/logs"
  7. "ColdVerify_local/models/System"
  8. "ColdVerify_local/models/Task"
  9. "fmt"
  10. "github.com/nats-io/nats.go"
  11. "github.com/vmihailenco/msgpack/v5"
  12. "os"
  13. )
  14. func init() {
  15. logs.Println("============Nats init============")
  16. var err error
  17. // 连接Nats服务器
  18. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  19. if err != nil {
  20. fmt.Println("nats 连接失败!")
  21. panic(err)
  22. }
  23. logs.Println("nats OK!")
  24. go NatsInit()
  25. }
  26. type Up_TaskData_Back struct {
  27. T_uuid string `xml:"T_uuid"`
  28. Task Task.Task `xml:"Task_r"`
  29. }
  30. func NatsInit() {
  31. // 获取微信二维码 打包本地数据
  32. _, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData", func(m *nats.Msg) {
  33. logs.Debug("ColdVerify_Local_Import_TaskData message: ", string(m.Data))
  34. logs.Info(lib.FuncName(), "任务数据-打包本地数据 开始打包数据...!")
  35. type T_R struct {
  36. T_uuid string `xml:"T_uuid"`
  37. T_task_id string `xml:"T_task_id"`
  38. TaskData_Num int `xml:"TaskData_Num"`
  39. }
  40. var t_r T_R
  41. err := msgpack.Unmarshal(m.Data, &t_r)
  42. if err != nil {
  43. logs.Error("Nats", "msgpack Unmarshal err", string(m.Data))
  44. return
  45. }
  46. T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
  47. Task_r, err := NatsServer.Read_Task(T_task_id)
  48. if err != nil {
  49. logs.Error(lib.FuncName(), err)
  50. }
  51. if t_r.TaskData_Num == 0 {
  52. // 已采集 无数据
  53. Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
  54. Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  55. Task_r.T_collection_state = 3
  56. err = NatsServer.Update_Task(Task_r)
  57. if err != nil {
  58. logs.Error(lib.FuncName(), err)
  59. }
  60. return
  61. }
  62. if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
  63. logs.Println("创建sql临时文件失败")
  64. }
  65. sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
  66. var org string
  67. //导出线上数据
  68. logs.Info("--------开始导出线上数据---------")
  69. i := 0
  70. for i < 10 {
  71. org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
  72. if err != nil {
  73. logs.Error(lib.FuncName(), "导出线上数据失败", err)
  74. } else {
  75. System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org)
  76. break
  77. }
  78. i++
  79. }
  80. logs.Info("--------开始导入数据到本地---------")
  81. j := 0
  82. flag := false
  83. Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
  84. for j < 10 {
  85. Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  86. org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
  87. if err != nil {
  88. logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error())
  89. } else {
  90. if Task.Check_TaskData_Num(T_task_id) {
  91. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org)
  92. flag = true
  93. break
  94. }
  95. }
  96. j++
  97. }
  98. // 重试10次后仍然没有成功导入数据
  99. if !flag {
  100. Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  101. Task_r.T_collection_state = 0
  102. err = NatsServer.Update_Task(Task_r)
  103. if err != nil {
  104. logs.Error(lib.FuncName(), err)
  105. }
  106. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败")
  107. return
  108. }
  109. //删除导出的sql文件
  110. _ = os.Remove(sql_file)
  111. Task_r.T_collection_state = 1
  112. err = NatsServer.Update_Task(Task_r)
  113. if err != nil {
  114. logs.Error(lib.FuncName(), err)
  115. }
  116. System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功")
  117. logs.Info(lib.FuncName(), "任务数据-打包本地数据 成功!")
  118. })
  119. //_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData2", func(m *nats.Msg) {
  120. // type T_R struct {
  121. // T_uuid string `xml:"T_uuid"`
  122. // T_task_id string `xml:"T_task_id"`
  123. // TaskData_Num int `xml:"TaskData_Num"`
  124. // }
  125. // var t_r T_R
  126. // err := msgpack.Unmarshal(m.Data, &t_r)
  127. // if err != nil {
  128. // System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  129. // return
  130. // }
  131. // logs.Debug("ColdVerify_Local_Import_TaskData message: %+v\n", t_r)
  132. //
  133. // T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
  134. // Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
  135. //
  136. // var limit = 50000
  137. // var offset = 0
  138. // var limitMaxNum = 10
  139. // var chData = make(chan int, limitMaxNum)
  140. // var jobGroup sync.WaitGroup
  141. // var tasknum = t_r.TaskData_Num / limit
  142. // if t_r.TaskData_Num%limit != 0 {
  143. // tasknum += 1
  144. // }
  145. //
  146. // //处理任务,最多同时有10个协程
  147. // for i := 0; i < tasknum; i++ {
  148. // chData <- 1
  149. // go func(index int) {
  150. // defer jobGroup.Done()
  151. // jobGroup.Add(1)
  152. // is := Task.Import_TaskData(T_task_id, limit*index, limit)
  153. // if !is {
  154. // logs.Error(lib.FuncName(), "导入数据失败", fmt.Sprintf("%s:%d-%d", T_task_id, offset, limit))
  155. // }
  156. // <-chData
  157. // }(i)
  158. // }
  159. // //使用Wait等待所有任务执行完毕
  160. // jobGroup.Wait()
  161. //
  162. // System.Add_UserLogs(T_uuid, "任务数据-打包本地数据", "从线上导入数据Z_TaskData_"+T_task_id, "")
  163. //
  164. //})
  165. // 获取微信二维码 更新线上数据
  166. _, _ = lib.Nats.Subscribe("ColdVerify_Local_Up_TaskData", func(m *nats.Msg) {
  167. logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
  168. logs.Info(lib.FuncName(), "任务数据-打包本地数据 更新线上数据...!")
  169. var t_r Up_TaskData_Back
  170. err := msgpack.Unmarshal(m.Data, &t_r)
  171. if err != nil {
  172. System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
  173. return
  174. }
  175. Task_r := t_r.Task
  176. T_uuid, T_task_id := t_r.T_uuid, Task_r.T_task_id
  177. if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
  178. logs.Println("创建sql临时文件失败")
  179. }
  180. sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
  181. org, err := Task.Dump_TaskData(T_task_id, conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
  182. if err != nil {
  183. logs.Error(lib.FuncName(), "导出本地数据失败")
  184. }
  185. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
  186. i := 0
  187. flag := false
  188. for i < 10 {
  189. Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
  190. org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
  191. if err != nil {
  192. logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error())
  193. } else {
  194. if Task.Check_TaskData_Num(T_task_id) {
  195. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
  196. flag = true
  197. break
  198. }
  199. }
  200. i++
  201. }
  202. // 重试10次后仍然没有成功导入数据
  203. if !flag {
  204. Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
  205. // 线上数据更新后 将当前任务 交付审核 标志 为 1
  206. Task_r.T_delivery_state = 0
  207. err = NatsServer.Update_Task(Task_r)
  208. if err != nil {
  209. logs.Error(lib.FuncName(), err)
  210. }
  211. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据-失败", "z_task_data_"+T_task_id, "重试10次失败")
  212. return
  213. }
  214. //删除导出的sql文件
  215. _ = os.Remove(sql_file)
  216. // 线上数据更新后 将当前任务 交付审核 标志 为 1
  217. Task_r.T_delivery_state = 1
  218. err = NatsServer.Update_Task(Task_r)
  219. if err != nil {
  220. logs.Error(lib.FuncName(), err)
  221. }
  222. System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, "成功")
  223. logs.Info(lib.FuncName(), "任务数据-更新线上数据 成功!")
  224. })
  225. }