瀏覽代碼

2023-07-10

zoie 1 年之前
父節點
當前提交
e7aec347ef
共有 3 個文件被更改,包括 53 次插入17 次删除
  1. 30 0
      Nats/Nats.go
  2. 6 2
      controllers/TaskData.go
  3. 17 15
      models/Task/TaskData.go

+ 30 - 0
Nats/Nats.go

@@ -8,6 +8,7 @@ import (
 	"ColdVerify_local/models/System"
 	"ColdVerify_local/models/Task"
 	"fmt"
+	orm2 "github.com/beego/beego/v2/client/orm"
 	"github.com/nats-io/nats.go"
 	"github.com/vmihailenco/msgpack/v5"
 	"os"
@@ -345,6 +346,8 @@ func NatsInit() {
 		//删除导出的sql文件
 		_ = os.Remove(sql_file)
 
+		DeleteDeduplicate(T_task_id)
+
 		Task_r.T_collection_state = 1
 		err = NatsServer.Update_Task(Task_r)
 		if err != nil {
@@ -385,3 +388,30 @@ func ChangeTableName(sql_file, T_task_id string) {
 
 	fmt.Printf("已将SQL文件 %s 中的表名 %s 替换为 %s\n", sqlFile, oldTableName, newTableName)
 }
+
+// 删除导入表的重复数据
+func DeleteDeduplicate(T_task_id string) error {
+	localOrm := orm2.NewOrmUsingDB(conf.Local_AliasName)
+	tb_name := "z_task_data_" + T_task_id
+
+	// 创建临时表
+	sqlCreate := "CREATE TABLE `tmp_table` AS (SELECT MAX(`ID`) AS `max_id` FROM " + tb_name + " GROUP BY `t_sn`,`t_id`,`t_time`);"
+	_, err := localOrm.Raw(sqlCreate).Exec()
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return err
+	}
+	sqlDelete := "DELETE FROM " + tb_name + "  WHERE `ID` NOT IN (SELECT `max_id` FROM `tmp_table`);"
+	_, err = localOrm.Raw(sqlDelete).Exec()
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return err
+	}
+	sqlDrop := "DROP TABLE `tmp_table`;"
+	_, err = localOrm.Raw(sqlDrop).Exec()
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return err
+	}
+	return nil
+}

+ 6 - 2
controllers/TaskData.go

@@ -1331,8 +1331,11 @@ func (c *TaskDataController) CopyFromPositionAverageSN() {
 	ct := copyTime
 	for i := 0; i < num; i++ {
 		if List1[i].T_time != List2[i].T_time {
-			ct = ct.Add(time.Second * time.Duration(T_saveT))
-			continue
+			c.Data["json"] = lib.JSONS{202, fmt.Sprintf("%s【%s】、%s【%s】时间不一致", List1[i].T_id, List1[i].T_time, List2[i].T_id, List2[i].T_time), nil}
+			c.ServeJSON()
+			return
+			//ct = ct.Add(time.Second * time.Duration(T_saveT))
+			//continue
 		}
 		T_t := (List1[i].T_t + List2[i].T_t) / 2
 		T_rh := (List1[i].T_rh + List2[i].T_rh) / 2
@@ -1345,6 +1348,7 @@ func (c *TaskDataController) CopyFromPositionAverageSN() {
 		})
 		ct = ct.Add(time.Second * time.Duration(T_saveT))
 	}
+
 	Task.DeleteTaskDataByTimeRange(Task_r.T_task_id, CopySN, CopyID, CopyTime, CopyEndTime)
 
 	go func(TaskDataList []Task.TaskData_, task_id string) {

+ 17 - 15
models/Task/TaskData.go

@@ -290,7 +290,7 @@ func Update_TaskData_ByT_sn(T_task_id, T_sn, T_id string) error {
 func Delete_TaskData_ByT_sn(T_task_id, T_sn string) error {
 	o := orm.NewOrm()
 	// 修改id
-	sql := "DELETE FROM z_task_data_" + T_task_id + " WHERE `t_sn` = " + T_sn
+	sql := "DELETE FROM z_task_data_" + T_task_id + " WHERE `t_sn` = '" + T_sn + "'"
 	logs.Println(sql)
 	_, err := o.Raw(sql).Exec()
 	if err != nil {
@@ -305,6 +305,7 @@ func Delete_TaskData_ByT_sn(T_task_id, T_sn string) error {
 func Add_TaskData(T_task_id string, T_sn string, T_id string, T_t string, T_rh string, T_time string) bool {
 
 	o := orm.NewOrm()
+	o.Begin()
 
 	// 开始插入数据
 	//
@@ -313,18 +314,18 @@ func Add_TaskData(T_task_id string, T_sn string, T_id string, T_t string, T_rh s
 	//	"ON DUPLICATE KEY UPDATE t_t=" + T_t + ", t_rh="+ T_rh +";"
 
 	// 去重复数据
-	//sql := "DELETE FROM z_task_data_" + T_task_id + "  WHERE " + " t_id = " + T_id + " AND "+ " t_sn = '"+T_sn+"' " + "AND t_time = '" + T_time + "'  "
-	//
-	////  这里有时间优化  用于一次 prepare 多次 exec,以提高批量执行的速度
+	sql := "DELETE FROM z_task_data_" + T_task_id + "  WHERE " + " t_id = '" + T_id + "' AND " + " t_sn = '" + T_sn + "' " + "AND t_time = '" + T_time + "'  "
+	//  这里有时间优化  用于一次 prepare 多次 exec,以提高批量执行的速度
 	//fmt.Println(sql)
-	//res, err := o.Raw(sql).Exec()
-	//if err != nil {
-	//	fmt.Println(err)
-	//	return false
-	//}
-	//res.RowsAffected()
-
-	sql := "INSERT INTO z_task_data_" + T_task_id + " (`t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) " +
+	res, err := o.Raw(sql).Exec()
+	if err != nil {
+		o.Rollback()
+		logs.Error(lib.FuncName(), err)
+		return false
+	}
+	res.RowsAffected()
+
+	sql = "INSERT INTO z_task_data_" + T_task_id + " (`t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) " +
 		"VALUES ('" + T_sn + "', " + T_id + ", " + T_t + "," + T_rh + ", '" + T_time + "')"
 
 	// 更新数据
@@ -332,13 +333,14 @@ func Add_TaskData(T_task_id string, T_sn string, T_id string, T_t string, T_rh s
 	//	"VALUES ('" + T_sn + "', " + T_id + ", " + T_t + "," + T_rh + ", '" + T_time + "')" +
 	//	"on duplicate key update `t_t`=" + T_t + ",`t_rh`=" + T_rh
 
-	fmt.Println(sql)
-	res, err := o.Raw(sql).Exec()
+	//fmt.Println(sql)
+	res, err = o.Raw(sql).Exec()
 	if err != nil {
+		o.Rollback()
 		logs.Error(lib.FuncName(), err)
 		return false
 	}
-	res.RowsAffected()
+	o.Commit()
 
 	//fmt.Println("mysql row affected nums: ", num)
 	return true