package Nats import ( "ColdVerify_local/Nats/NatsServer" "ColdVerify_local/conf" "ColdVerify_local/lib" "ColdVerify_local/logs" "ColdVerify_local/models/System" "ColdVerify_local/models/Task" "fmt" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "os" ) func init() { logs.Println("============Nats init============") var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url) if err != nil { fmt.Println("nats 连接失败!") panic(err) } logs.Println("nats OK!") go NatsInit() } type Up_TaskData_Back struct { T_uuid string `xml:"T_uuid"` Task Task.Task `xml:"Task_r"` } func NatsInit() { // 打包本地数据 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Import_TaskData", "Import_TaskData", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Import_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-打包本地数据 开始打包数据...!") type T_R struct { T_uuid string `xml:"T_uuid"` T_task_id string `xml:"T_task_id"` TaskData_Num int `xml:"TaskData_Num"` } var t_r T_R err := msgpack.Unmarshal(m.Data, &t_r) if err != nil { logs.Error("Nats", "msgpack Unmarshal err", string(m.Data)) return } T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id Task_r, err := NatsServer.Read_Task(T_task_id) if err != nil { logs.Error(lib.FuncName(), err) return } if t_r.TaskData_Num == 0 { // 已采集 无数据 Task.CREATE_TaskData(conf.Local_AliasName, T_task_id) //Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) Task_r.T_collection_state = 3 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } return } if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil { logs.Println("创建sql临时文件失败") } sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id) var org string //导出线上数据 logs.Info("--------开始导出线上数据---------") i := 0 for i < 10 { org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file) if err != nil { logs.Error(lib.FuncName(), "导出线上数据失败", err) } else { System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org) break } i++ } logs.Info("--------开始导入数据到【本地】---------") j := 0 flag := false Task.CREATE_TaskData(conf.Local_AliasName, T_task_id) for j < 10 { Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file) if err != nil { logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error()) } else { if Task.Check_TaskData_Num(T_task_id) { System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org) flag = true break } } j++ } // 重试10次后仍然没有成功导入数据 if !flag { // 清空本地数据 Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) Task_r.T_collection_state = 0 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败") return } //删除导出的sql文件 _ = os.Remove(sql_file) Task_r.T_collection_state = 1 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) return } System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功") logs.Info("--------导入数据到本地【成功】!---------") }) //_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData2", func(m *nats.Msg) { // type T_R struct { // T_uuid string `xml:"T_uuid"` // T_task_id string `xml:"T_task_id"` // TaskData_Num int `xml:"TaskData_Num"` // } // var t_r T_R // err := msgpack.Unmarshal(m.Data, &t_r) // if err != nil { // System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data)) // return // } // logs.Debug("ColdVerify_Local_Import_TaskData message: %+v\n", t_r) // // T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id // Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) // // var limit = 50000 // var offset = 0 // var limitMaxNum = 10 // var chData = make(chan int, limitMaxNum) // var jobGroup sync.WaitGroup // var tasknum = t_r.TaskData_Num / limit // if t_r.TaskData_Num%limit != 0 { // tasknum += 1 // } // // //处理任务,最多同时有10个协程 // for i := 0; i < tasknum; i++ { // chData <- 1 // go func(index int) { // defer jobGroup.Done() // jobGroup.Add(1) // is := Task.Import_TaskData(T_task_id, limit*index, limit) // if !is { // logs.Error(lib.FuncName(), "导入数据失败", fmt.Sprintf("%s:%d-%d", T_task_id, offset, limit)) // } // <-chData // }(i) // } // //使用Wait等待所有任务执行完毕 // jobGroup.Wait() // // System.Add_UserLogs(T_uuid, "任务数据-打包本地数据", "从线上导入数据Z_TaskData_"+T_task_id, "") // //}) // 更新线上数据 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Up_TaskData", "Up_TaskData", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-打包本地数据 更新线上数据...!") var t_r Up_TaskData_Back err := msgpack.Unmarshal(m.Data, &t_r) if err != nil { System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data)) return } Task_r := t_r.Task T_uuid, T_task_id := t_r.T_uuid, Task_r.T_task_id if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil { logs.Println("创建sql临时文件失败") } sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id) org, err := Task.Dump_TaskData(T_task_id, conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file) if err != nil { logs.Error(lib.FuncName(), "导出本地数据失败") return } System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org) logs.Info("--------开始导入数据到【线上】---------") i := 0 flag := false for i < 10 { Task.Truncate_TaskData(conf.Server_AliasName, T_task_id) org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file) if err != nil { logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error()) } else { if Task.Check_TaskData_Num(T_task_id) { System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org) flag = true break } } i++ } // 重试10次后仍然没有成功导入数据 if !flag { Task.Truncate_TaskData(conf.Server_AliasName, T_task_id) // 线上数据更新后 将当前任务 交付审核 标志 为 1 Task_r.T_delivery_state = 0 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据-失败", "z_task_data_"+T_task_id, "重试10次失败") return } //删除导出的sql文件 _ = os.Remove(sql_file) // 线上数据更新后 将当前任务 交付审核 标志 为 1 Task_r.T_delivery_state = 1 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, "成功") logs.Info("--------导入数据到【线上】成功!---------") }) // 创建本地任务表 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Create_Table", "Create_Table", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-创建数据库表 更新线上数据...!") Task.CREATE_TaskData(conf.Local_AliasName, string(m.Data)) }) }