Browse Source

2023-05-31 1.0数据复制到2.0

zoie 1 year ago
parent
commit
3e725c26fe
6 changed files with 203 additions and 11 deletions
  1. 5 1
      ColdVerify_local.go
  2. 124 2
      Nats/Nats.go
  3. 9 0
      conf/app.conf
  4. 8 0
      conf/config.go
  5. 2 5
      controllers/TaskData.go
  6. 55 3
      models/Task/TaskData.go

+ 5 - 1
ColdVerify_local.go

@@ -29,9 +29,13 @@ func init() {
 	orm.RegisterDataBase(conf.Server_AliasName, "mysql",
 		conf.MysqlServer2_Username+":"+conf.MysqlServer2_Password+"@tcp("+conf.MysqlServer2_UrlPort+")/"+conf.MysqlServer2_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
 		conf.MysqlServer2_MaxIdleConnections, conf.MysqlServer2_MaxOpenConnections)
-	orm.RunSyncdb(conf.Server_AliasName, false, true) // 创建线上数据库
 	println(conf.MysqlServer2_Username + ":" + conf.MysqlServer2_Password + "@tcp(" + conf.MysqlServer2_UrlPort + ")/" + conf.MysqlServer2_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
 
+	orm.RegisterDataBase(conf.Verify1_AliasName, "mysql",
+		conf.MysqlVerify_Username+":"+conf.MysqlVerify_Password+"@tcp("+conf.MysqlVerify_UrlPort+")/"+conf.MysqlVerify_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
+		conf.MysqlServer2_MaxIdleConnections, conf.MysqlServer2_MaxOpenConnections)
+	println(conf.MysqlVerify_Username + ":" + conf.MysqlVerify_Password + "@tcp(" + conf.MysqlVerify_UrlPort + ")/" + conf.MysqlVerify_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
+
 }
 
 func main() {

+ 124 - 2
Nats/Nats.go

@@ -11,6 +11,8 @@ import (
 	"github.com/nats-io/nats.go"
 	"github.com/vmihailenco/msgpack/v5"
 	"os"
+	"strings"
+	"time"
 )
 
 func init() {
@@ -95,9 +97,12 @@ func NatsInit() {
 		logs.Info("--------开始导入数据到【本地】---------")
 		j := 0
 		flag := false
-		Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
+		// 清空表
+		//Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
+		//time.Sleep(2 * time.Second)
 		for j < 10 {
 			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
 			org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
 			if err != nil {
 				logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error())
@@ -115,6 +120,7 @@ func NatsInit() {
 		if !flag {
 			// 清空本地数据
 			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
 			Task_r.T_collection_state = 0
 			err = NatsServer.Update_Task(Task_r)
 			if err != nil {
@@ -215,6 +221,7 @@ func NatsInit() {
 		flag := false
 		for i < 10 {
 			Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
 			org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
 			if err != nil {
 				logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error())
@@ -230,6 +237,7 @@ func NatsInit() {
 		// 重试10次后仍然没有成功导入数据
 		if !flag {
 			Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
 			// 线上数据更新后 将当前任务 交付审核 标志 为 1
 			Task_r.T_delivery_state = 0
 			err = NatsServer.Update_Task(Task_r)
@@ -258,10 +266,124 @@ func NatsInit() {
 	// 创建本地任务表
 	_, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Create_Table", "Create_Table", func(m *nats.Msg) {
 		logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
-		logs.Info(lib.FuncName(), "任务数据-创建数据库表 更新线上数据...!")
+		logs.Info(lib.FuncName(), "任务数据-创建数据库表!")
 
 		Task.CREATE_TaskData(conf.Local_AliasName, string(m.Data))
 
 	})
 
+	// 1.0数据同步到2。0
+	_, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Sync1_TaskData", "Sync1_TaskData", func(m *nats.Msg) {
+		logs.Debug("ColdVerify_Local_Sync1_TaskData message: ", string(m.Data))
+		logs.Info(lib.FuncName(), "1.0数据同步到2。0!")
+		T_uuid := "9b6b9f9d-f8f2-46fb-82c8-101d4a309c34"
+		T_task_id := string(m.Data)
+
+		Task_r, err := NatsServer.Read_Task(T_task_id)
+		if err != nil {
+			logs.Error(lib.FuncName(), err)
+			return
+		}
+
+		if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
+			logs.Println("创建sql临时文件失败")
+		}
+		sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
+
+		var org string
+
+		//导出1.0数据
+		logs.Info("--------开始导出1.0数据---------")
+		i := 0
+		for i < 10 {
+			org, err = Task.Dump_TaskData_Verify(T_task_id, conf.MysqlVerify_Username, conf.MysqlVerify_Password, conf.MysqlVerify_UrlPort, conf.MysqlVerify_Database, sql_file)
+			if err != nil {
+				logs.Error(lib.FuncName(), "开始导出1.0数据失败", err)
+			} else {
+				System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org)
+				break
+			}
+			i++
+		}
+
+		ChangeTableName(sql_file, T_task_id)
+
+		logs.Info("--------开始导入数据到【本地】---------")
+		j := 0
+		flag := false
+		// 清空表
+		//Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
+		//time.Sleep(2 * time.Second)
+		for j < 10 {
+			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
+			org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
+			if err != nil {
+				logs.Error("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error())
+			} else {
+				if Task.Check_TaskData_Num_Verify1(T_task_id) {
+					System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org)
+					flag = true
+					break
+				}
+			}
+			j++
+		}
+		// 重试10次后仍然没有成功导入数据
+
+		if !flag {
+			// 清空本地数据
+			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			time.Sleep(2 * time.Second)
+			Task_r.T_collection_state = 0
+			err = NatsServer.Update_Task(Task_r)
+			if err != nil {
+				logs.Error(lib.FuncName(), err)
+			}
+			System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败")
+			return
+		}
+
+		//删除导出的sql文件
+		//_ = os.Remove(sql_file)
+
+		Task_r.T_collection_state = 1
+		err = NatsServer.Update_Task(Task_r)
+		if err != nil {
+			logs.Error(lib.FuncName(), err)
+			return
+		}
+		System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功")
+		logs.Info("--------导入数据到本地【成功】!---------")
+
+	})
+}
+
+func ChangeTableName(sql_file, T_task_id string) {
+	// 定义命令行参数
+	sqlFile := sql_file
+	oldTableName := "Z_TaskData_" + T_task_id
+	newTableName := strings.ToLower("z_task_data_" + T_task_id)
+
+	// 读取SQL文件内容
+	sqlBytes, err := os.ReadFile(sqlFile)
+	if err != nil {
+		fmt.Println("读取SQL文件失败:", err)
+		os.Exit(1)
+	}
+
+	// 将SQL文件内容转换为字符串
+	sqlStr := string(sqlBytes)
+
+	// 替换旧表名为新表名
+	newSqlStr := strings.ReplaceAll(sqlStr, oldTableName, newTableName)
+
+	// 将新的SQL字符串写入原文件
+	err = os.WriteFile(sqlFile, []byte(newSqlStr), 0644)
+	if err != nil {
+		fmt.Println("写入SQL文件失败:", err)
+		os.Exit(1)
+	}
+
+	fmt.Printf("已将SQL文件 %s 中的表名 %s 替换为 %s\n", sqlFile, oldTableName, newTableName)
 }

+ 9 - 0
conf/app.conf

@@ -34,6 +34,15 @@ MysqlServer2_Password = "Bd3d34yJ7aibiEi!"
 MysqlServer2_MaxIdleConnections = 100
 MysqlServer2_MaxOpenConnections = 200
 
+# Mysql 1.0
+Verify1_AliasName = "Verify1"
+MysqlVerify_UrlPort = "127.0.0.1:3300"
+MysqlVerify_Database = "cold_verify"
+MysqlVerify_Username = "cold_verify"
+MysqlVerify_Password = "Cdac2yEfzNKKMJwy"
+MysqlVerify_MaxIdleConnections = 100
+MysqlVerify_MaxOpenConnections = 200
+
 # 静态资源
 Qiniu_AccessKey = "-8ezB_d-8-eUFTMvhOGbGzgeQRPeKQnaQ3DBcUxo"
 Qiniu_SecretKey = "KFhkYxTAJ2ZPN3ZS3euTsfWk8-C92rKgkhAMkDRN"

+ 8 - 0
conf/config.go

@@ -31,6 +31,14 @@ var MysqlServer2_Password, _ = beego.AppConfig.String("MysqlServer2_Password")
 var MysqlServer2_MaxIdleConnections, _ = beego.AppConfig.Int("MysqlServer2_MaxIdleConnections")
 var MysqlServer2_MaxOpenConnections, _ = beego.AppConfig.Int("MysqlServer2_MaxOpenConnections")
 
+var Verify1_AliasName, _ = beego.AppConfig.String("Verify1_AliasName")
+var MysqlVerify_UrlPort, _ = beego.AppConfig.String("MysqlVerify_UrlPort")
+var MysqlVerify_Database, _ = beego.AppConfig.String("MysqlVerify_Database")
+var MysqlVerify_Username, _ = beego.AppConfig.String("MysqlVerify_Username")
+var MysqlVerify_Password, _ = beego.AppConfig.String("MysqlVerify_Password")
+var MysqlVerify_MaxIdleConnections, _ = beego.AppConfig.Int("MysqlVerify_MaxIdleConnections")
+var MysqlVerify_MaxOpenConnections, _ = beego.AppConfig.Int("MysqlVerify_MaxOpenConnections")
+
 var Sql_Temp_Dir, _ = beego.AppConfig.String("Sql_Temp_Dir")
 
 // Redis

+ 2 - 5
controllers/TaskData.go

@@ -40,10 +40,7 @@ func (c *TaskDataController) TaskData_List() {
 	Time_start := c.GetString("Time_start")
 	Time_end := c.GetString("Time_end")
 	T_sn := c.GetString("T_sn")
-	T_id, err := c.GetInt("T_id")
-	if err != nil {
-		T_id = -1
-	}
+	T_id := c.GetString("T_id")
 
 	T_task_id := c.GetString("T_task_id")
 	Task_r, err := NatsServer.Read_Task(T_task_id)
@@ -375,7 +372,7 @@ func (c *TaskDataController) Export_Data_Excel() {
 	T_sn_list := strings.Split(T_sn_str, ",")
 	DeviceSensor_data_list := []Task.TaskData_{}
 	for _, v := range T_sn_list {
-		DeviceSensor_data, _ := Task.Read_TaskData_ById_List(Task_r.T_task_id, v, -1, Time_start, Time_end, 1, 9999)
+		DeviceSensor_data, _ := Task.Read_TaskData_ById_List(Task_r.T_task_id, v, "", Time_start, Time_end, 1, 9999)
 		DeviceSensor_data_list = append(DeviceSensor_data_list, DeviceSensor_data...)
 	}
 

+ 55 - 3
models/Task/TaskData.go

@@ -94,7 +94,7 @@ type TaskDataClass_ struct {
 	T_id string `orm:"column(t_id);size(256);null"` // 名称
 }
 
-func Read_TaskData_ById_List(T_task_id string, SN string, T_id int, Time_start_ string, Time_end_ string, page int, page_z int) ([]TaskData_, int64) {
+func Read_TaskData_ById_List(T_task_id string, SN string, T_id string, Time_start_ string, Time_end_ string, page int, page_z int) ([]TaskData_, int64) {
 	o := orm.NewOrm()
 	var maps []TaskData_
 	var maps_z []orm2.ParamsList
@@ -117,8 +117,8 @@ func Read_TaskData_ById_List(T_task_id string, SN string, T_id int, Time_start_
 		sql_condition += " AND t_time <= '" + Time_end_ + "'"
 	}
 
-	if T_id != -1 {
-		sql_condition += " AND t_id = '" + strconv.Itoa(T_id) + "'"
+	if len(T_id) > 0 {
+		sql_condition += " AND t_id = '" + T_id + "'"
 	}
 	if len(SN) > 0 {
 		sql_condition += " AND t_sn = '" + SN + "'"
@@ -441,6 +441,30 @@ func Check_TaskData_Num(T_task_id string) bool {
 	return true
 }
 
+// 检查导出表总数
+func Check_TaskData_Num_Verify1(T_task_id string) bool {
+	o1 := orm2.NewOrmUsingDB(conf.Local_AliasName)
+	o2 := orm2.NewOrmUsingDB(conf.Verify1_AliasName)
+
+	var maps_z1, maps_z2 []orm2.ParamsList
+	sql := "SELECT COUNT(ID) FROM z_task_data_" + strings.ToLower(T_task_id)
+	fmt.Println(sql)
+	_, err := o1.Raw(sql).ValuesList(&maps_z1)
+	if err != nil {
+		return false
+	}
+	sql2 := "SELECT COUNT(ID) FROM Z_TaskData_" + T_task_id
+	_, err = o2.Raw(sql2).ValuesList(&maps_z2)
+	if err != nil {
+		return false
+	}
+	if len(maps_z1) != len(maps_z2) {
+		return false
+	}
+
+	return true
+}
+
 func Read_TaskData_Num(T_task_id string) int {
 	o1 := orm2.NewOrmUsingDB(conf.Local_AliasName)
 
@@ -484,6 +508,34 @@ func Dump_TaskData(T_task_id, root, password, url_port, database, sql_file strin
 	return org, err
 }
 
+func Dump_TaskData_Verify(T_task_id, root, password, url_port, database, sql_file string) (string, error) {
+	// url_port 127.0.0.1:3306
+	// mysql8.0 以上加 --column-statistics=0
+	// mysqldump --column-statistics=0 -uroot -proot -h127.0.0.1 -P3306 cold_verify Z_TaskData_ixEfo5zk2Oeb > /Data/Z_TaskData_ixEfo5zk2Oeb.sql
+	v := Read_Local_Mysql_Version()
+	host_port := strings.Split(url_port, ":")
+	table_name := "Z_TaskData_" + T_task_id
+	org := "mysqldump "
+	if v >= 8 {
+		org += "--column-statistics=0 "
+	}
+	//--no-create-info 只导出数据,而不添加 CREATE TABLE 语句。
+	//--single-transaction 在备份库的时候并不锁定数据表
+	//--add-locks:在每个表导出之前增加LOCK TABLES并且之后UNLOCK TABLE。(默认为打开状态,使用--skip-add-locks取消选项)
+	//--compact:压缩模式,产生更少的输出;
+
+	org = org + fmt.Sprintf("-u%s -p%s -h%s -P%s --no-create-info --set-gtid-purged=OFF --skip-add-locks --compact --quick %s %s > %s ",
+		root, password, host_port[0], host_port[1], database, table_name, sql_file)
+
+	logs.Println(org)
+	_, err := lib.Command(org)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+	}
+
+	return org, err
+}
+
 func Insert_TaskData(root, password, url_port, database, sql_file string) (string, error) {
 	// url_port 127.0.0.1:3306
 	// mysql -u root -p root -h127.0.0.1 -P3306 cold_verify_local < /data/Z_TaskData_ixEfo5zk2Oeb.sql