package Nats import ( "ColdVerify_local/Nats/NatsServer" "ColdVerify_local/conf" "ColdVerify_local/lib" "ColdVerify_local/logs" "ColdVerify_local/models/System" "ColdVerify_local/models/Task" "fmt" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "os" "strings" "time" ) func init() { logs.Println("============Nats init============") var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://"+conf.NatsServer_Url, nats.MaxReconnects(-1), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { logs.Error("nats 断开连接", err.Error()) }), nats.ReconnectHandler(func(nc *nats.Conn) { logs.Println("nats 重连...") })) nats.RetryOnFailedConnect(true) if err != nil { logs.Error("nats 断开连接", err.Error()) } logs.Println("nats OK!") // 本地测试,屏蔽本地nats if !conf.NatsForbidden && lib.Nats != nil { go NatsInit() } } // 每10分钟重新连接nats func ReconnectNATS() { // 循环监测连接状态并重新连接 var err error for { time.Sleep(1 * time.Minute) //time.Sleep(1 * time.Minute) if (lib.Nats != nil && !lib.Nats.IsConnected()) || lib.Nats == nil { lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url) if err != nil { logs.Println("无法重新连接到 NATS:", err) continue } logs.Println("成功重新连接到 NATS...") } } } type Up_TaskData_Back struct { T_uuid string `xml:"T_uuid"` Task Task.Task `xml:"Task_r"` } func NatsInit() { // 打包本地数据 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Import_TaskData", "Import_TaskData", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Import_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-打包本地数据 开始打包数据...!") type T_R struct { T_uuid string `xml:"T_uuid"` T_task_id string `xml:"T_task_id"` TaskData_Num int `xml:"TaskData_Num"` } var t_r T_R err := msgpack.Unmarshal(m.Data, &t_r) if err != nil { logs.Error("Nats", "msgpack Unmarshal err", string(m.Data)) return } T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id Task_r, err := NatsServer.Read_Task(T_task_id) if err != nil { logs.Error(lib.FuncName(), err) return } if t_r.TaskData_Num == 0 { // 已采集 无数据 Task.CREATE_TaskData(conf.Local_AliasName, T_task_id) //Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) Task_r.T_collection_state = 3 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } return } if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil { logs.Println("创建sql临时文件失败") } sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id) var org string //导出线上数据 logs.Info("--------开始导出线上数据---------") i := 0 for i < 10 { org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file) if err != nil { logs.Error(lib.FuncName(), "导出线上数据失败", err) } else { System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org) break } i++ } logs.Info("--------开始导入数据到【本地】---------") j := 0 flag := false // 清空表 //Task.CREATE_TaskData(conf.Local_AliasName, T_task_id) //time.Sleep(2 * time.Second) for j < 10 { Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) time.Sleep(2 * time.Second) org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file) if err != nil { logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error()) } else { if Task.Check_TaskData_Num(T_task_id) { System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org) flag = true break } } j++ } // 重试10次后仍然没有成功导入数据 if !flag { // 清空本地数据 Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) time.Sleep(2 * time.Second) Task_r.T_collection_state = 0 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败") return } //删除导出的sql文件 _ = os.Remove(sql_file) // 删除重复数据 _, err = Task.DeleteDeduplicate(T_task_id) if err != nil { logs.Error(lib.FuncName(), err) return } Task_r.T_collection_state = 1 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) return } System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功") logs.Info("--------导入数据到本地【成功】!---------") }) // 更新线上数据 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Up_TaskData", "Up_TaskData", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-打包本地数据 更新线上数据...!") var t_r Up_TaskData_Back err := msgpack.Unmarshal(m.Data, &t_r) if err != nil { System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data)) return } Task_r := t_r.Task T_uuid, T_task_id := t_r.T_uuid, Task_r.T_task_id if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil { logs.Println("创建sql临时文件失败") } sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id) var org string //导出线上数据 logs.Info("--------开始导出本地数据---------") i := 0 for i < 10 { org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file) if err != nil { logs.Error(lib.FuncName(), "导出本地数据失败", err) } else { System.Add_UserLogs_T(T_uuid, "任务数据-导出本地数据失败", "z_task_data_"+T_task_id, org) break } i++ } logs.Info("--------开始导入数据到【线上】---------") j := 0 flag := false for j < 10 { Task.Truncate_TaskData(conf.Server_AliasName, T_task_id) time.Sleep(2 * time.Second) org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file) if err != nil { logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error()) } else { if Task.Check_TaskData_Num(T_task_id) { System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org) flag = true break } } j++ } // 重试10次后仍然没有成功导入数据 if !flag { Task.Truncate_TaskData(conf.Server_AliasName, T_task_id) time.Sleep(2 * time.Second) // 线上数据更新后 将当前任务 交付审核 标志 为 1 Task_r.T_delivery_state = 0 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据-失败", "z_task_data_"+T_task_id, "重试10次失败") return } //删除导出的sql文件 _ = os.Remove(sql_file) // 线上数据更新后 将当前任务 交付审核 标志 为 1 Task_r.T_delivery_state = 1 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } // 删除绑定数据区间外的数据 NatsServer.Del_TaskData_ByT_BindDeviceDataTime(T_task_id) System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, "成功") logs.Info("--------导入数据到【线上】成功!---------") }) // 创建本地任务表 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Create_Table", "Create_Table", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "任务数据-创建数据库表!") Task.CREATE_TaskData(conf.Local_AliasName, string(m.Data)) }) // 1.0数据同步到2。0 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Sync1_TaskData", "Sync1_TaskData", func(m *nats.Msg) { logs.Debug("ColdVerify_Local_Sync1_TaskData message: ", string(m.Data)) logs.Info(lib.FuncName(), "1.0数据同步到2。0!") T_uuid := "9b6b9f9d-f8f2-46fb-82c8-101d4a309c34" T_task_id := string(m.Data) Task_r, err := NatsServer.Read_Task(T_task_id) if err != nil { logs.Error(lib.FuncName(), err) return } if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil { logs.Println("创建sql临时文件失败") } sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id) var org string //导出1.0数据 logs.Info("--------开始导出1.0数据---------") i := 0 for i < 10 { org, err = Task.Dump_TaskData_Verify(T_task_id, conf.MysqlVerify_Username, conf.MysqlVerify_Password, conf.MysqlVerify_UrlPort, conf.MysqlVerify_Database, sql_file) if err != nil { logs.Error(lib.FuncName(), "开始导出1.0数据失败", err) } else { System.Add_UserLogs_T(T_uuid, "任务数据-导出1.0数据失败", "z_task_data_"+T_task_id, org) break } i++ } ChangeTableName(sql_file, T_task_id) logs.Info("--------开始导入数据到2.0【本地】---------") j := 0 flag := false for j < 10 { // 清空表 Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) time.Sleep(2 * time.Second) org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file) if err != nil { logs.Error("任务数据-导入1.0数据到2.0失败", "z_task_data_"+T_task_id, err.Error()) } else { if Task.Check_TaskData_Num_Verify1(T_task_id) { System.Add_UserLogs_T(T_uuid, "任务数据-导入1.0数据到2.0", "z_task_data_"+T_task_id, org) flag = true break } } j++ } // 重试10次后仍然没有成功导入数据 if !flag { // 清空本地数据 Task.Truncate_TaskData(conf.Local_AliasName, T_task_id) time.Sleep(2 * time.Second) Task_r.T_collection_state = 0 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) } System.Add_UserLogs_T(T_uuid, "任务数据-导入1.0数据到2.0-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败") return } //删除导出的sql文件 _ = os.Remove(sql_file) Task.DeleteDeduplicate(T_task_id) Task_r.T_collection_state = 1 err = NatsServer.Update_Task(Task_r) if err != nil { logs.Error(lib.FuncName(), err) return } System.Add_UserLogs_T(T_uuid, "任务数据-导入1.0数据到2.0", "z_task_data_"+T_task_id, "成功") logs.Info("--------导入1.0数据到2.0【成功】---------") }) } func ChangeTableName(sql_file, T_task_id string) { // 定义命令行参数 sqlFile := sql_file oldTableName := "Z_TaskData_" + T_task_id newTableName := strings.ToLower("z_task_data_" + T_task_id) // 读取SQL文件内容 sqlBytes, err := os.ReadFile(sqlFile) if err != nil { fmt.Println("读取SQL文件失败:", err) os.Exit(1) } // 将SQL文件内容转换为字符串 sqlStr := string(sqlBytes) // 替换旧表名为新表名 newSqlStr := strings.ReplaceAll(sqlStr, oldTableName, newTableName) // 将新的SQL字符串写入原文件 err = os.WriteFile(sqlFile, []byte(newSqlStr), 0644) if err != nil { fmt.Println("写入SQL文件失败:", err) os.Exit(1) } fmt.Printf("已将SQL文件 %s 中的表名 %s 替换为 %s\n", sqlFile, oldTableName, newTableName) }