Browse Source

update: 更新打包本地数据,更新线上数据为后台运行

zoie 2 years ago
parent
commit
aaebd39dd6
14 changed files with 604 additions and 369 deletions
  1. 4 0
      ColdVerify_local.go
  2. 238 0
      Nats/Nats.go
  3. 69 0
      Nats/NatsServer/NatsMqtt.go
  4. 19 23
      conf/app.conf
  5. 3 10
      conf/config.go
  6. 76 32
      controllers/TaskData.go
  7. 5 0
      go.mod
  8. 11 0
      go.sum
  9. 10 8
      lib/Qiniu.go
  10. 11 14
      lib/lib.go
  11. 44 21
      logs/LogPrintln.go
  12. 25 157
      models/Task/Task.go
  13. 87 103
      models/Task/TaskData.go
  14. 2 1
      routers/TaskData.go

+ 4 - 0
ColdVerify_local.go

@@ -1,15 +1,18 @@
 package main
 
 import (
+	_ "ColdVerify_local/Nats"
 	"ColdVerify_local/conf"
 	_ "ColdVerify_local/routers"
 	"fmt"
 	"github.com/beego/beego/v2/adapter/orm"
+	orm2 "github.com/beego/beego/v2/client/orm"
 	beego "github.com/beego/beego/v2/server/web"
 	"github.com/beego/beego/v2/server/web/filter/cors"
 	_ "github.com/go-sql-driver/mysql"
 	"runtime"
 	"strconv"
+	"time"
 )
 
 func init() {
@@ -20,6 +23,7 @@ func init() {
 		conf.MysqlServer_Username+":"+conf.MysqlServer_Password+"@tcp("+conf.MysqlServer_UrlPort+")/"+conf.MysqlServer_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
 		conf.MysqlServer_MaxIdleConnections, conf.MysqlServer_MaxOpenConnections)
 	orm.RunSyncdb(conf.Local_AliasName, false, true) // 创建本地数据库
+	orm2.ConnMaxLifetime(30 * time.Minute)
 	println(conf.MysqlServer_Username + ":" + conf.MysqlServer_Password + "@tcp(" + conf.MysqlServer_UrlPort + ")/" + conf.MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
 
 	orm.RegisterDataBase(conf.Server_AliasName, "mysql",

+ 238 - 0
Nats/Nats.go

@@ -0,0 +1,238 @@
+package Nats
+
+import (
+	"ColdVerify_local/Nats/NatsServer"
+	"ColdVerify_local/conf"
+	"ColdVerify_local/lib"
+	"ColdVerify_local/logs"
+	"ColdVerify_local/models/System"
+	"ColdVerify_local/models/Task"
+	"fmt"
+	"github.com/nats-io/nats.go"
+	"github.com/vmihailenco/msgpack/v5"
+	"os"
+)
+
+func init() {
+
+	logs.Println("============Nats init============")
+	var err error
+	// 连接Nats服务器
+	lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
+	if err != nil {
+		fmt.Println("nats 连接失败!")
+		panic(err)
+	}
+	logs.Println("nats OK!")
+
+	go NatsInit()
+}
+
+type Up_TaskData_Back struct {
+	T_uuid string    `xml:"T_uuid"`
+	Task   Task.Task `xml:"Task_r"`
+}
+
+func NatsInit() {
+
+	// 获取微信二维码 打包本地数据
+	_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData", func(m *nats.Msg) {
+		logs.Debug("ColdVerify_Local_Import_TaskData message: ", string(m.Data))
+		logs.Info(lib.FuncName(), "任务数据-打包本地数据 开始打包数据...!")
+
+		type T_R struct {
+			T_uuid    string `xml:"T_uuid"`
+			T_task_id string `xml:"T_task_id"`
+		}
+		var t_r T_R
+		err := msgpack.Unmarshal(m.Data, &t_r)
+		if err != nil {
+			System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
+			return
+		}
+		T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
+
+		if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
+			logs.Println("创建sql临时文件失败")
+		}
+		sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
+
+		var org string
+
+		//导出线上数据
+		i := 0
+		for i < 10 {
+			org, err = Task.Dump_TaskData(T_task_id, conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
+			if err != nil {
+				logs.Error(lib.FuncName(), "导出线上数据失败")
+			} else {
+				System.Add_UserLogs_T(T_uuid, "任务数据-导出线上数据", "z_task_data_"+T_task_id, org)
+				break
+			}
+			i++
+		}
+
+		j := 0
+		flag := false
+		Task.CREATE_TaskData(conf.Local_AliasName, T_task_id)
+		for j < 10 {
+			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			org, err = Task.Insert_TaskData(conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
+			if err != nil {
+				logs.Println("任务数据-打包本地数据", "z_task_data_"+T_task_id, err.Error())
+			} else {
+				if Task.Check_TaskData_Num(T_task_id) {
+					System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, org)
+					flag = true
+					break
+				}
+			}
+			j++
+		}
+		// 重试10次后仍然没有成功导入数据
+		Task_r, err := NatsServer.Read_Task(T_task_id)
+		if err != nil {
+			logs.Error(lib.FuncName(), err)
+		}
+		if !flag {
+			Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+			Task_r.T_collection_state = 0
+			err = NatsServer.Update_Task(Task_r)
+			if err != nil {
+				logs.Error(lib.FuncName(), err)
+			}
+			System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据-失败", "线下导入数据z_task_data_"+T_task_id, "重试10次失败")
+			return
+		}
+
+		//删除导出的sql文件
+		_ = os.Remove(sql_file)
+
+		Task_r.T_collection_state = 1
+		err = NatsServer.Update_Task(Task_r)
+		if err != nil {
+			logs.Error(lib.FuncName(), err)
+		}
+		System.Add_UserLogs_T(T_uuid, "任务数据-打包本地数据", "z_task_data_"+T_task_id, "成功")
+
+		logs.Info(lib.FuncName(), "任务数据-打包本地数据 成功!")
+
+	})
+
+	//_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData2", func(m *nats.Msg) {
+	//	type T_R struct {
+	//		T_uuid       string `xml:"T_uuid"`
+	//		T_task_id    string `xml:"T_task_id"`
+	//		TaskData_Num int    `xml:"TaskData_Num"`
+	//	}
+	//	var t_r T_R
+	//	err := msgpack.Unmarshal(m.Data, &t_r)
+	//	if err != nil {
+	//		System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
+	//		return
+	//	}
+	//	logs.Debug("ColdVerify_Local_Import_TaskData message: %+v\n", t_r)
+	//
+	//	T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
+	//	Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
+	//
+	//	var limit = 50000
+	//	var offset = 0
+	//	var limitMaxNum = 10
+	//	var chData = make(chan int, limitMaxNum)
+	//	var jobGroup sync.WaitGroup
+	//	var tasknum = t_r.TaskData_Num / limit
+	//	if t_r.TaskData_Num%limit != 0 {
+	//		tasknum += 1
+	//	}
+	//
+	//	//处理任务,最多同时有10个协程
+	//	for i := 0; i < tasknum; i++ {
+	//		chData <- 1
+	//		go func(index int) {
+	//			defer jobGroup.Done()
+	//			jobGroup.Add(1)
+	//			is := Task.Import_TaskData(T_task_id, limit*index, limit)
+	//			if !is {
+	//				logs.Error(lib.FuncName(), "导入数据失败", fmt.Sprintf("%s:%d-%d", T_task_id, offset, limit))
+	//			}
+	//			<-chData
+	//		}(i)
+	//	}
+	//	//使用Wait等待所有任务执行完毕
+	//	jobGroup.Wait()
+	//
+	//	System.Add_UserLogs(T_uuid, "任务数据-打包本地数据", "从线上导入数据Z_TaskData_"+T_task_id, "")
+	//
+	//})
+
+	// 获取微信二维码 更新线上数据
+	_, _ = lib.Nats.Subscribe("ColdVerify_Local_Up_TaskData", func(m *nats.Msg) {
+		logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
+		logs.Info(lib.FuncName(), "任务数据-打包本地数据 更新线上数据...!")
+
+		var t_r Up_TaskData_Back
+		err := msgpack.Unmarshal(m.Data, &t_r)
+		if err != nil {
+			System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
+			return
+		}
+		Task_r := t_r.Task
+		T_uuid, T_task_id := t_r.T_uuid, Task_r.T_task_id
+
+		if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
+			logs.Println("创建sql临时文件失败")
+		}
+		sql_file := fmt.Sprintf("%sz_task_data_%s.sql", conf.Sql_Temp_Dir, T_task_id)
+		org, err := Task.Dump_TaskData(T_task_id, conf.MysqlServer_Username, conf.MysqlServer_Password, conf.MysqlServer_UrlPort, conf.MysqlServer_Database, sql_file)
+		if err != nil {
+			logs.Error(lib.FuncName(), "导出本地数据失败")
+		}
+		System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
+
+		i := 0
+		flag := false
+		for i < 10 {
+			Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
+			org, err = Task.Insert_TaskData(conf.MysqlServer2_Username, conf.MysqlServer2_Password, conf.MysqlServer2_UrlPort, conf.MysqlServer2_Database, sql_file)
+			if err != nil {
+				logs.Println(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, err.Error())
+			} else {
+				if Task.Check_TaskData_Num(T_task_id) {
+					System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, org)
+					flag = true
+					break
+				}
+			}
+			i++
+		}
+		// 重试10次后仍然没有成功导入数据
+		if !flag {
+			Task.Truncate_TaskData(conf.Server_AliasName, T_task_id)
+			// 线上数据更新后 将当前任务 交付审核 标志 为 1
+			Task_r.T_delivery_state = 0
+			err = NatsServer.Update_Task(Task_r)
+			if err != nil {
+				logs.Error(lib.FuncName(), err)
+			}
+			System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据-失败", "z_task_data_"+T_task_id, "重试10次失败")
+			return
+		}
+
+		//删除导出的sql文件
+		_ = os.Remove(sql_file)
+
+		// 线上数据更新后 将当前任务 交付审核 标志 为 1
+		Task_r.T_delivery_state = 1
+		err = NatsServer.Update_Task(Task_r)
+		if err != nil {
+			logs.Error(lib.FuncName(), err)
+		}
+
+		System.Add_UserLogs_T(T_uuid, "任务数据-更新线上数据", "z_task_data_"+T_task_id, "成功")
+
+		logs.Info(lib.FuncName(), "任务数据-更新线上数据 成功!")
+
+	})
+
+}

+ 69 - 0
Nats/NatsServer/NatsMqtt.go

@@ -0,0 +1,69 @@
+package NatsServer
+
+import (
+	"ColdVerify_local/lib"
+	"ColdVerify_local/logs"
+	"ColdVerify_local/models/Task"
+	"errors"
+	"github.com/vmihailenco/msgpack/v5"
+	"time"
+)
+
+func Update_Task(v Task.Task) error {
+	logs.Debug("Nats =>", lib.FuncName(), v)
+
+	b, err := msgpack.Marshal(&v)
+	if err != nil {
+		panic(err)
+	}
+
+	msg, err := lib.Nats.Request("ColdVerify_Server_Update_Task", b, 3*time.Second)
+	if err != nil {
+		return err
+	}
+
+	type T_R struct {
+		Code int16     `xml:"Code"`
+		Msg  string    `xml:"Msg"`
+		Data Task.Task `xml:"Data"`
+	}
+	var t_R T_R
+
+	err = msgpack.Unmarshal(msg.Data, &t_R)
+	if err != nil {
+		return err
+	}
+	if t_R.Code != 200 {
+		return errors.New(t_R.Msg)
+	}
+
+	return nil
+
+}
+
+func Read_Task(T_task_id string) (task Task.Task, err error) {
+	logs.Println("Nats =>", lib.FuncName(), T_task_id)
+
+	msg, err := lib.Nats.Request("ColdVerify_Server_Read_Task", []byte(T_task_id), 3*time.Second)
+	if err != nil {
+		return task, err
+	}
+
+	type T_R struct {
+		Code int16     `xml:"Code"`
+		Msg  string    `xml:"Msg"`
+		Data Task.Task `xml:"Data"`
+	}
+	var t_R T_R
+
+	err = msgpack.Unmarshal(msg.Data, &t_R)
+	if err != nil {
+		return task, err
+	}
+	if t_R.Code != 200 {
+		return task, errors.New(t_R.Msg)
+	}
+
+	return t_R.Data, nil
+
+}

+ 19 - 23
conf/app.conf

@@ -5,38 +5,34 @@ Graceful = true
 EnableDocs = true
 copyrequestbody = true
 
-
-# Mysql 本地
-# MysqlServer_UrlPort = "47.111.15.17:3306"
-# MysqlServer_Database = "cold_verify"
-# MysqlServer_Username = "cold_verify"
-# MysqlServer_Password = "kzFbKseJm5GFpPFS"
-Local_AliasName = "default"
-MysqlServer_UrlPort = "127.0.0.1:3306"
-MysqlServer_Database = "cold_verify_local"
-MysqlServer_Username = "root"
-MysqlServer_Password = "12345678"
-MysqlServer_MaxIdleConnections = 100
-MysqlServer_MaxOpenConnections = 200
-
-# Mysql 线上
-Server_AliasName = "server"
-MysqlServer2_UrlPort = "175.178.229.79:30306"
-MysqlServer2_Database = "cold_verify"
-MysqlServer2_Username = "root"
-MysqlServer2_Password = "root"
-MysqlServer2_MaxIdleConnections = 100
-MysqlServer2_MaxOpenConnections = 200
+# Nats
+NatsServer_Url = "127.0.0.1:43422"
 
 # 存放sql临时文件目录
 Sql_Temp_Dir = "./temp/"
 
 # Redis
-Redis_address = "175.178.229.79:30379"
+Redis_address = "127.0.0.1:43379"
 Redis_password = ""
 Redis_dbNum = "2"
 
+# Mysql 本地
+Local_AliasName = "default"
+MysqlServer_UrlPort = "192.168.1.23:3306"
+MysqlServer_Database = "coldverify_local"
+MysqlServer_Username = "coldverify_local"
+MysqlServer_Password = "hmZbXNWdpXKB3K8x"
+MysqlServer_MaxIdleConnections = 100
+MysqlServer_MaxOpenConnections = 200
 
+# # Mysql 线上
+Server_AliasName = "server"
+MysqlServer2_UrlPort = "127.0.0.1:40306"
+MysqlServer2_Database = "coldverify"
+MysqlServer2_Username = "coldverify"
+MysqlServer2_Password = "Bd3d34yJ7aibiEi!"
+MysqlServer2_MaxIdleConnections = 100
+MysqlServer2_MaxOpenConnections = 200
 
 # 静态资源
 Qiniu_AccessKey = "-8ezB_d-8-eUFTMvhOGbGzgeQRPeKQnaQ3DBcUxo"

+ 3 - 10
conf/config.go

@@ -2,21 +2,14 @@ package conf
 
 import (
 	beego "github.com/beego/beego/v2/server/web"
-	"runtime"
 )
 
-func init() {
-	if runtime.GOOS == "windows" {
-		Server_test = true
-	} else {
-		Server_test = false
-	}
-}
-
-var Server_test = false
+// Nats
+var NatsServer_Url, _ = beego.AppConfig.String("NatsServer_Url")
 
 var HTTPPort, _ = beego.AppConfig.String("HTTPPort")
 var AppName, _ = beego.AppConfig.String("appname")
+var RunMode, _ = beego.AppConfig.String("runmode")
 
 var Page_size = 10
 

+ 76 - 32
controllers/TaskData.go

@@ -1,6 +1,8 @@
 package controllers
 
 import (
+	"ColdVerify_local/Nats"
+	"ColdVerify_local/Nats/NatsServer"
 	"ColdVerify_local/conf"
 	"ColdVerify_local/lib"
 	"ColdVerify_local/logs"
@@ -8,6 +10,7 @@ import (
 	"ColdVerify_local/models/Task"
 	"fmt"
 	beego "github.com/beego/beego/v2/server/web"
+	"github.com/vmihailenco/msgpack/v5"
 	"github.com/xuri/excelize/v2"
 	"math"
 	"os"
@@ -39,12 +42,11 @@ func (c *TaskDataController) TaskData_List() {
 	T_id, err := c.GetInt("T_id")
 	if err != nil {
 		T_id = -1
-
 	}
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
@@ -66,10 +68,17 @@ func (c *TaskDataController) TaskData_List() {
 
 // 列表 -
 func (c *TaskDataController) TaskDataClass_List() {
-
+	page, _ := c.GetInt("page")
+	if page < 1 {
+		page = 1
+	}
+	page_z, _ := c.GetInt("page_z")
+	if page_z < 1 {
+		page_z = conf.Page_size
+	}
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
@@ -88,8 +97,8 @@ func (c *TaskDataController) TaskData_AddS() {
 	T_uuid, _ := lib.GetAdminT_Uuid(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
@@ -114,7 +123,7 @@ func (c *TaskDataController) TaskData_AddS() {
 			continue
 		}
 		v_list := strings.Split(v, "|")
-		is = Task.Add_TaskData(Task_r.T_task_id, v_list[0], v_list[1], v_list[2], v_list[3], v_list[4])
+		is := Task.Add_TaskData(Task_r.T_task_id, v_list[0], v_list[1], v_list[2], v_list[3], v_list[4])
 		if is {
 			T_Data_list_x += 1
 		}
@@ -139,14 +148,14 @@ func (c *TaskDataController) TaskData_Add() {
 	T_time := c.GetString("T_time")
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
 
-	is = Task.Add_TaskData(Task_r.T_task_id, T_sn, strconv.Itoa(T_id), fmt.Sprintf("%.2f", T_t), fmt.Sprintf("%.2f", T_rh), T_time)
+	is := Task.Add_TaskData(Task_r.T_task_id, T_sn, strconv.Itoa(T_id), fmt.Sprintf("%.2f", T_t), fmt.Sprintf("%.2f", T_rh), T_time)
 	if !is {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "添加失败!"}
 		c.ServeJSON()
@@ -186,14 +195,14 @@ func (c *TaskDataController) TaskData_Up() {
 	T_time := c.GetString("T_time")
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
 
-	is = Task.Up_TaskData(Task_r.T_task_id, strconv.Itoa(Id), fmt.Sprintf("%.2f", T_t), fmt.Sprintf("%.2f", T_rh), T_time)
+	is := Task.Up_TaskData(Task_r.T_task_id, strconv.Itoa(Id), fmt.Sprintf("%.2f", T_t), fmt.Sprintf("%.2f", T_rh), T_time)
 	if !is {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "修改失败!"}
 		c.ServeJSON()
@@ -220,14 +229,14 @@ func (c *TaskDataController) TaskData_Del() {
 	}
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
 
-	is = Task.Del_TaskData(Task_r.T_task_id, strconv.Itoa(Id))
+	is := Task.Del_TaskData(Task_r.T_task_id, strconv.Itoa(Id))
 	if !is {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "删除失败!"}
 		c.ServeJSON()
@@ -253,14 +262,14 @@ func (c *TaskDataController) TaskData_Del_t_id() {
 	}
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
 
-	is = Task.Del_TaskData_t_id(Task_r.T_task_id, strconv.Itoa(Id))
+	is := Task.Del_TaskData_t_id(Task_r.T_task_id, strconv.Itoa(Id))
 	if !is {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "删除失败!"}
 		c.ServeJSON()
@@ -286,8 +295,8 @@ func (c *TaskDataController) Export_Data_Excel() {
 	//}
 
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
@@ -354,8 +363,8 @@ func (c *TaskDataController) Export_Data_Excel() {
 
 func (c *TaskDataController) Check() {
 	T_task_id := c.GetString("T_task_id")
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
@@ -444,13 +453,13 @@ func (c *TaskDataController) TaskData_Import_TaskData() {
 	T_uuid, _ := lib.GetAdminT_Uuid(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
 
 	T_task_id := c.GetString("T_task_id")
-	_, is := Task.Read_Task(T_task_id)
-	if !is {
+	_, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
-	if err := lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
+	if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
 		logs.Println("创建sql临时文件失败")
 	}
 	sql_file := fmt.Sprintf("%sZ_TaskData_%s.sql", conf.Sql_Temp_Dir, T_task_id)
@@ -501,13 +510,13 @@ func (c *TaskDataController) TaskData_Up_TaskData() {
 
 	T_task_id := c.GetString("T_task_id")
 
-	Task_r, is := Task.Read_Task(T_task_id)
-	if !is {
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
 		c.ServeJSON()
 		return
 	}
-	if err := lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
+	if err = lib.Create_Dir(conf.Sql_Temp_Dir); err != nil {
 		logs.Println("创建sql临时文件失败")
 	}
 	sql_file := fmt.Sprintf("%sZ_TaskData_%s.sql", conf.Sql_Temp_Dir, T_task_id)
@@ -548,7 +557,8 @@ func (c *TaskDataController) TaskData_Up_TaskData() {
 
 	// 线上数据更新后 将当前任务 交付审核 标志 为 1
 	Task_r.T_delivery_state = 1
-	if !Task.Update_Task(Task_r, "T_delivery_state") {
+	err = NatsServer.Update_Task(Task_r)
+	if err != nil {
 		c.Data["json"] = lib.JSONS{Code: 202, Msg: "修改失败!"}
 		c.ServeJSON()
 		return
@@ -560,3 +570,37 @@ func (c *TaskDataController) TaskData_Up_TaskData() {
 	c.ServeJSON()
 
 }
+
+// 更新线上数据后台执行
+func (c *TaskDataController) TaskData_Up_TaskData_Back() {
+	// 获取登录用户的uuid
+	T_uuid, _ := lib.GetAdminT_Uuid(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
+
+	T_task_id := c.GetString("T_task_id")
+
+	Task_r, err := NatsServer.Read_Task(T_task_id)
+	if err != nil {
+		c.Data["json"] = lib.JSONS{Code: 202, Msg: "T_task_id 错误!"}
+		c.ServeJSON()
+		return
+	}
+	// 采集中
+	Task_r.T_delivery_state = 2
+	err = NatsServer.Update_Task(Task_r)
+	if err != nil {
+		c.Data["json"] = lib.JSONS{Code: 202, Msg: "提交失败!"}
+		c.ServeJSON()
+		return
+	}
+
+	data := Nats.Up_TaskData_Back{
+		T_uuid: T_uuid,
+		Task:   Task_r,
+	}
+	b, _ := msgpack.Marshal(&data)
+	_ = lib.Nats.Publish("ColdVerify_Local_Up_TaskData", b)
+
+	c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!"}
+	c.ServeJSON()
+
+}

+ 5 - 0
go.mod

@@ -21,6 +21,9 @@ require (
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/mitchellh/mapstructure v1.5.0 // indirect
 	github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
+	github.com/nats-io/nats.go v1.24.0 // indirect
+	github.com/nats-io/nkeys v0.3.0 // indirect
+	github.com/nats-io/nuid v1.0.1 // indirect
 	github.com/phpdave11/gofpdi v1.0.11 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/prometheus/client_golang v1.14.0 // indirect
@@ -30,6 +33,8 @@ require (
 	github.com/richardlehane/mscfb v1.0.4 // indirect
 	github.com/richardlehane/msoleps v1.0.3 // indirect
 	github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
+	github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
+	github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
 	github.com/xuri/efp v0.0.0-20220603152613-6918739fd470 // indirect
 	github.com/xuri/nfp v0.0.0-20220409054826-5e722a1d9e22 // indirect
 	golang.org/x/crypto v0.5.0 // indirect

+ 11 - 0
go.sum

@@ -207,6 +207,12 @@ github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9
 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ=
+github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA=
+github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
+github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
+github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
@@ -290,6 +296,10 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
 github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
 github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
+github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
+github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
+github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
+github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
 github.com/xuri/efp v0.0.0-20220603152613-6918739fd470 h1:6932x8ltq1w4utjmfMPVj09jdMlkY0aiA6+Skbtl3/c=
 github.com/xuri/efp v0.0.0-20220603152613-6918739fd470/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI=
@@ -313,6 +323,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=

+ 10 - 8
lib/Qiniu.go

@@ -17,21 +17,23 @@ import (
 
 var Qiniu *qbox.Mac
 
-//var (
+// var (
+//
 //	//BUCKET是你在存储空间的名称
 //	accessKey = "-8ezB_d-8-eUFTMvhOGbGzgeQRPeKQnaQ3DBcUxo"
 //	secretKey = "KFhkYxTAJ2ZPN3ZS3euTsfWk8-C92rKgkhAMkDRN"
 //	BUCKET    = "bzdcoldoss"
-//)
+//
+// )
 func init() {
 	Qiniu = qbox.NewMac(conf.Qiniu_AccessKey, conf.Qiniu_SecretKey)
 }
 
-//if !lib.Pload_qiniu("ofile/"+timeStr+".xlsx","ofile/"+timeStr+".xlsx"){
-//c.Data["json"] = lib.JSONS{Code: 203, Msg: "oss!"}
-//c.ServeJSON()
-//return
-//}
+// if !lib.Pload_qiniu("ofile/"+timeStr+".xlsx","ofile/"+timeStr+".xlsx"){
+// c.Data["json"] = lib.JSONS{Code: 203, Msg: "oss!"}
+// c.ServeJSON()
+// return
+// }
 func Pload_qiniu(localFile string, name string) bool {
 	//localFile := "C:\\Users\\Administrator\\Downloads\\kodo-browser-Windows-x64-v1.0.15.zip"
 	//key := "kodo-browser-Windows-x64-v1.0.15.zip"
@@ -62,7 +64,7 @@ func Pload_qiniu(localFile string, name string) bool {
 	}
 	err := formUploader.PutFile(context.Background(), &ret, upToken, name, localFile, &putExtra)
 	if err != nil {
-		logs.Println(err)
+		logs.Println(FuncName(), err)
 		System.Add_Logs("七牛云", "上传文件失败"+localFile, err.Error())
 		return false
 	}

+ 11 - 14
lib/lib.go

@@ -7,6 +7,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"github.com/nats-io/nats.go"
 	"github.com/signintech/gopdf"
 	"io/ioutil"
 	"log"
@@ -20,19 +21,7 @@ import (
 	"time"
 )
 
-var Run_My_Server = false // 运行当期服务
-
-type Cl_ struct {
-	Uuid_list map[string]string // 泛型
-}
-
-var CountrySnMap map[string]Cl_                        /*创建集合 */
-var CountryRead_DeviceParameterSnMap map[string]string /*创建集合 */
-func init() {
-	CountrySnMap = make(map[string]Cl_)
-	CountryRead_DeviceParameterSnMap = make(map[string]string)
-
-}
+var Nats *nats.Conn
 
 type JSONS struct {
 	//必须的大写开头
@@ -338,7 +327,7 @@ func Random(min, max int) int {
 	return rand.Intn(max-min) + min
 }
 
-//取文本(字符串)中间
+// 取文本(字符串)中间
 func GetBetweenStr(str, start, end string) string {
 	n := strings.Index(str, start)
 	if n == -1 {
@@ -491,3 +480,11 @@ func Create_Dir(path string) error {
 	}
 	return nil
 }
+
+// 获取正在运行的函数名
+func FuncName() string {
+	pc := make([]uintptr, 1)
+	runtime.Callers(2, pc)
+	f := runtime.FuncForPC(pc[0])
+	return f.Name()
+}

+ 44 - 21
logs/LogPrintln.go

@@ -1,36 +1,59 @@
 package logs
 
 import (
-	"fmt"
+	"ColdVerify_local/conf"
 	"github.com/astaxie/beego/logs"
-	"runtime"
-	"time"
 )
 
-var logx *logs.BeeLogger
-var Test = true
+var Logx *logs.BeeLogger
 
 func init() {
-	logx = logs.NewLogger()
-	logx.SetLogger(logs.AdapterFile, `{"filename":"logs/logx/logx.log"}`)
+	Logx = logs.NewLogger()
+	Logx.SetLogger(logs.AdapterFile, `{"filename":"logs/Logx/Logx.log"}`)
+	Logx.SetLevel(logs.LevelDebug)
+	Logx.EnableFuncCallDepth(true)
+	Logx.SetLogFuncCallDepth(3)
+	if conf.RunMode == "dev" {
+		Logx.SetLogger(logs.AdapterConsole, `{"filename":"logs/console/console.log"}`)
+	}
+}
+
+func Println(format string, v ...interface{}) {
+	for _, _ = range v {
+		format += " %v"
+	}
+	Logx.Info(format, v...)
+}
 
-	if runtime.GOOS == "windows" {
-		Test = true
-	} else {
-		Test = false
+// Debug Log DEBUG level message.
+func Debug(format string, v ...interface{}) {
+	for _, _ = range v {
+		format += " %v"
 	}
+	Logx.Debug(format, v...)
 }
 
-func Println(a ...interface{}) {
-	if Test {
-		fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "=>", a)
+// Info Log ERROR level message.
+func Info(format string, v ...interface{}) {
+	for _, _ = range v {
+		format += " %v"
 	}
-	logx.Info(time.Now().Format("2006-01-02 15:04:05")+">", a)
+	Logx.Info(format, v...)
 }
 
-//func Println(a ...interface{}) {
-//	if test {
-//		fmt.Printf("=>", a)
-//	}
-//	logx.Info(">", a)
-//}
+// Error Log ERROR level message.
+func Error(format string, v ...interface{}) {
+	for _, _ = range v {
+		format += " %v"
+	}
+
+	Logx.Error(format, v...)
+}
+
+// Warning Log WARNING level message.
+func Warning(format string, v ...interface{}) {
+	for _, _ = range v {
+		format += " %v"
+	}
+	Logx.Warning(format, v...)
+}

+ 25 - 157
models/Task/Task.go

@@ -1,13 +1,6 @@
 package Task
 
 import (
-	"ColdVerify_local/conf"
-	"ColdVerify_local/logs"
-	"encoding/json"
-	"fmt"
-	"github.com/astaxie/beego/cache"
-	"github.com/beego/beego/v2/adapter/orm"
-	orm2 "github.com/beego/beego/v2/client/orm"
 	_ "github.com/go-sql-driver/mysql"
 	"time"
 )
@@ -15,156 +8,31 @@ import (
 // 模板
 type Task struct {
 	Id                  int    `orm:"column(ID);size(11);auto;pk"`
-	T_task_id           string `orm:"size(256);null"` // 任务ID
-	T_uuid              string `orm:"size(256);null"` // 用户 UUID
-	T_name              string `orm:"size(256);null"` // 标题
-	T_VerifyTemplate_id string `orm:"size(256);null"` // 模板id
-	T_deadline          string `orm:"size(256);null"` // 截止时间
-	T_collection        string `orm:"size(256);null"` // 数据采集 负责人UUID
-	T_reporting         string `orm:"size(256);null"` // 报告编写 负责人UUID
-	T_delivery          string `orm:"size(256);null"` // 交付审核 负责人UUID
-	T_collection_state  int    `orm:"size(2);0"`      // 数据采集 状态 0 未完成 1 已完成
-	T_reporting_state   int    `orm:"size(2);0"`      // 报告编写 状态 0 未完成 1 已完成
-	T_delivery_state    int    `orm:"size(2);0"`      // 交付审核 状态 0 未完成 1 已完成
-
-	T_doc1 string `orm:"size(256);null"` // 封面
-	T_pdf1 string `orm:"size(256);null"` // 封面
-	T_doc2 string `orm:"size(256);null"` // 报告
-	T_pdf2 string `orm:"size(256);null"` // 报告
-	T_doc3 string `orm:"size(256);null"` // 证书
-	T_pdf3 string `orm:"size(256);null"` // 证书
-
-	T_Show     int       `orm:"size(2);1"`                                             // 0 隐藏   1 公开
-	T_Visit    int       `orm:"size(200);0"`                                           // 浏览量
-	T_State    int       `orm:"size(2);1"`                                             // 0 删除   1 正常
+	T_class             int    `orm:"size(200);default(0)"` // 分类id
+	T_task_id           string `orm:"size(256);null"`       // 任务ID
+	T_uuid              string `orm:"size(256);null"`       // 用户 UUID
+	T_name              string `orm:"size(256);null"`       // 标题
+	T_VerifyTemplate_id string `orm:"size(256);null"`       // 模板id
+	T_deadline          string `orm:"size(256);null"`       // 截止时间
+	T_scheme            string `orm:"size(256);null"`       // 实施方案 负责人UUID
+	T_collection        string `orm:"size(256);null"`       // 数据采集 负责人UUID
+	T_reporting         string `orm:"size(256);null"`       // 报告编写 负责人UUID
+	T_delivery          string `orm:"size(256);null"`       // 交付审核 负责人UUID
+	T_scheme_state      int    `orm:"size(2);default(0)"`   // 实施方案 状态 0 未完成 1 已完成
+	T_collection_state  int    `orm:"size(2);default(0)"`   // 数据采集 状态 0 未完成 1 已完成 2 采集中
+	T_reporting_state   int    `orm:"size(2);default(0)"`   // 报告编写 状态 0 未完成 1 已完成
+	T_delivery_state    int    `orm:"size(2);default(0)"`   // 交付审核 状态 0 未完成 1 已完成 2 采集中
+
+	T_doc1 string `orm:"type(text);null"` // 封面
+	T_pdf1 string `orm:"type(text);null"` // 封面
+	T_doc2 string `orm:"type(text);null"` // 报告
+	T_pdf2 string `orm:"type(text);null"` // 报告
+	T_doc3 string `orm:"type(text);null"` // 证书
+	T_pdf3 string `orm:"type(text);null"` // 证书
+
+	T_Show     int       `orm:"size(2);default(1)"`                                    // 0 隐藏   1 公开
+	T_Visit    int       `orm:"size(200);default(0)"`                                  // 浏览量
+	T_State    int       `orm:"size(2);default(1)"`                                    // 0 删除   1 正常
 	CreateTime time.Time `orm:"column(create_time);type(timestamp);null;auto_now_add"` //auto_now_add 第一次保存时才设置时间
 	UpdateTime time.Time `orm:"column(update_time);type(timestamp);null;auto_now"`     //auto_now 每次 model 保存时都会对时间自动更新
 }
-
-type Task_R struct {
-	T_task_id  string // 标题
-	T_name     string // 标题
-	T_doc1     string // 封面
-	T_pdf1     string // 封面
-	T_doc2     string // 报告
-	T_pdf2     string // 报告
-	T_doc3     string // 证书
-	T_pdf3     string // 证书
-	T_Show     int    // 公开/隐藏
-	T_Visit    int    // 浏览量
-	T_State    int    // 0    1
-	CreateTime time.Time
-	UpdateTime time.Time
-}
-
-type Task_ struct {
-	T_task_id           string // 任务ID
-	T_uuid              string // 用户 UUID
-	T_name              string // 标题
-	T_VerifyTemplate_id string // 任务模板id
-	T_deadline          string // 截止时间
-	T_collection        string // 数据采集 负责人UUID
-	T_reporting         string // 报告编写 负责人UUID
-	T_delivery          string // 交付审核 负责人UUID
-	T_collection_state  int    // 数据采集 状态 0 未完成 1 已完成
-	T_reporting_state   int    // 报告编写 状态 0 未完成 1 已完成
-	T_delivery_state    int    // 交付审核 状态 0 未完成 1 已完成
-	T_collection_name   string // 数据采集 负责人姓名
-	T_reporting_name    string // 报告编写 负责人姓名
-	T_delivery_name     string // 交付审核 负责人姓名
-	T_doc1              string // 封面
-	T_pdf1              string // 封面
-	T_doc2              string // 报告
-	T_pdf2              string // 报告
-	T_doc3              string // 证书
-	T_pdf3              string // 证书
-	T_Show              int    // 0 公开 1 隐藏
-	T_Visit             int    // 浏览量
-	T_State             int    // 0 删除 1 正常
-}
-
-func (t *Task) TableName() string {
-	return "task" // 数据库名称   // ************** 替换 FormulaList **************
-}
-
-var redisCache_Task cache.Cache
-
-func init() {
-
-	//注册模型
-	orm.RegisterModel(new(Task))
-
-	config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
-		"redis_"+"Task", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
-	logs.Println(config)
-	var err error
-	redisCache_Task, err = cache.NewCache("redis", config)
-	if err != nil || redisCache_Task == nil {
-		errMsg := "failed to init redis"
-		logs.Println(errMsg, err)
-	}
-}
-
-// ---------------- Redis -------------------
-// Redis_Set(m.T_sn,m) // Redis 更新缓存
-func Redis_Task_Set(key string, r Task) (err error) {
-	//json序列化
-	str, err := json.Marshal(r)
-	if err != nil {
-		logs.Println(err)
-		return
-	}
-	err = redisCache_Task.Put(key, str, 24*time.Hour)
-	if err != nil {
-		logs.Println("set key:", key, ",value:", str, err)
-	}
-	return
-}
-
-// if r,is :=Redis_Get(T_sn);is{
-// return r,nil
-// }
-func Redis_Task_Get(key string) (r Task, is bool) {
-	if redisCache_Task.IsExist(key) {
-		logs.Println("找到key:", key)
-		v := redisCache_Task.Get(key)
-		json.Unmarshal(v.([]byte), &r)
-		return r, true
-	}
-	logs.Println("没有 找到key:", key)
-	return Task{}, false
-}
-func Redis_Task_DelK(key string) (err error) {
-	err = redisCache_Task.Delete(key)
-	return
-}
-
-// ---------------- 特殊方法 -------------------
-
-// 获取 线上任务
-func Read_Task(T_task_id string) (r Task, is bool) {
-	if r, is = Redis_Task_Get(T_task_id); is == true {
-		return r, true
-	}
-	o := orm2.NewOrmUsingDB(conf.Server_AliasName)
-	qs := o.QueryTable(new(Task))
-	//err := qs.Filter("T_task_id", T_task_id).Filter("T_State", 1).One(&r)
-	err := qs.Filter("T_task_id", T_task_id).One(&r)
-	if err != nil {
-		return r, false
-	}
-
-	Redis_Task_Set(T_task_id, r)
-	return r, true
-}
-
-// 修改 线上任务
-func Update_Task(m Task, cols ...string) bool {
-	o := orm2.NewOrmUsingDB(conf.Server_AliasName)
-	if num, err := o.Update(&m, cols...); err == nil {
-		fmt.Println("Number of records updated in database:", num)
-		Redis_Task_Set(m.T_task_id, m)
-		return true
-	}
-	return false
-}

+ 87 - 103
models/Task/TaskData.go

@@ -11,7 +11,6 @@ import (
 	_ "github.com/go-sql-driver/mysql"
 	"strconv"
 	"strings"
-	"sync"
 	"time"
 )
 
@@ -50,21 +49,19 @@ func CREATE_TaskData(alias_name, T_task_id string) bool {
 		"	`t_time` datetime(0) NULL DEFAULT NULL," +
 		"	PRIMARY KEY (`ID`) USING BTREE" +
 		") ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;"
+
 	_, err := o.Raw(sql).Exec()
 	if err != nil {
 		return false
 	}
+
+	//sqlIndex := "ALTER TABLE `z_task_data_" + T_task_id + "` add unique index(`t_sn`,`t_id`,`t_time`);"
+	//logs.Debug(sqlIndex)
+	//o.Raw(sqlIndex).Exec()
 	return true
 }
 
 // ---------------- 特殊方法 -------------------
-// 获取 ById
-//func Read_List_ById(id int) (r TaskData) {
-//	o := orm.NewOrm()
-//
-//	return r
-//}
-
 // 清空
 func Truncate_TaskData(alias_name, T_task_id string) bool {
 
@@ -72,37 +69,11 @@ func Truncate_TaskData(alias_name, T_task_id string) bool {
 
 	sql := "truncate table z_task_data_" + T_task_id
 	logs.Println(sql)
-	o.Raw(sql).Exec()
-
-	return true
-}
-
-// 添加
-func Import_TaskData(Sn string, T_id int, T_task_id string, Time_start string, Time_end string) bool {
-	//if conf.Server_test {
-	//	return true
-	//}
-	o := orm.NewOrm()
-
-	// 开始插入数据
-	sql := "insert into z_task_data_" + T_task_id + "(t_id,t_t,t_rh,t_time) select t_id,t_t,t_rh,t_time from Z_DeviceData_" + Sn
-	sql = sql + " WHERE t_time >= '" + Time_start + "' AND t_time <= '" + Time_end + "'"
-	//  这里有时间优化  用于一次 prepare 多次 exec,以提高批量执行的速度
-	logs.Println(sql)
-	res, err := o.Raw(sql).Exec()
+	_, err := o.Raw(sql).Exec()
 	if err != nil {
-		fmt.Println(err)
+		logs.Error(lib.FuncName(), err)
 		return false
 	}
-
-	// 强行替换  ID
-	sql = "UPDATE z_task_data_" + T_task_id + " SET `t_sn` = '" + Sn + "',`t_id` = " + strconv.Itoa(T_id) + " WHERE `t_sn` IS NULL"
-	// 设备 ID
-	//sql = "UPDATE z_task_data_"+T_task_id+" SET `t_sn` = '"+Sn+"' WHERE `t_sn` IS NULL"
-	logs.Println(sql)
-	o.Raw(sql).Exec()
-
-	res.RowsAffected()
 	return true
 }
 
@@ -148,8 +119,11 @@ func Read_TaskData_ById_List(T_task_id string, SN string, T_id int, Time_start_
 	if len(SN) > 0 {
 		sql_condition += " AND t_sn = '" + SN + "'"
 	}
+	if len(sql_condition) > 0 {
+		sql_condition = " WHERE " + strings.TrimLeft(sql_condition, " AND ")
+	}
 
-	sql := "SELECT COUNT(ID) FROM z_task_data_" + T_task_id + " WHERE 1=1" + sql_condition
+	sql := "SELECT COUNT(ID) FROM z_task_data_" + T_task_id + sql_condition
 
 	fmt.Println(sql)
 	_, err := o.Raw(sql).ValuesList(&maps_z)
@@ -161,13 +135,16 @@ func Read_TaskData_ById_List(T_task_id string, SN string, T_id int, Time_start_
 	}
 
 	//fmt.Println("maps_z;",maps_z[0][0])
-	sql = "SELECT ID,t_sn,t_id,t_t,t_rh,DATE_FORMAT(t_time,'%Y-%m-%d %H:%i:%s') AS t_times,t_time FROM z_task_data_" + T_task_id + " WHERE 1=1" + sql_condition + " ORDER BY t_time DESC "
+	sql = "SELECT ID,t_sn,t_id,t_t,t_rh,DATE_FORMAT(t_time,'%Y-%m-%d %H:%i:%s') AS t_times,t_time FROM z_task_data_" + T_task_id + sql_condition + " ORDER BY t_time DESC"
 	if page_z != 9999 {
 		sql = sql + " LIMIT " + strconv.Itoa(offset) + "," + strconv.Itoa(pagez)
 	}
 
 	fmt.Println(sql)
 	_, err = o.Raw(sql).QueryRows(&maps)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+	}
 
 	//value, _ := strconv.ParseFloat(fmt.Sprintf("%.2f", cnt), 64)
 	key, _ := strconv.Atoi(maps_z[0][0].(string))
@@ -187,7 +164,11 @@ func Read_TaskData_ById_List_(T_task_id string, SN string, T_id int) []TaskData_
 	sql := "SELECT ID,t_sn,t_id,t_t,t_rh,DATE_FORMAT(t_time,'%Y-%c-%d %H:%i') AS t_times,t_time  FROM z_task_data_" + T_task_id + " WHERE " + sql_condition + " ORDER BY t_time "
 
 	fmt.Println(sql)
-	o.Raw(sql).QueryRows(&maps)
+	_, err := o.Raw(sql).QueryRows(&maps)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return maps
+	}
 
 	return maps
 }
@@ -200,7 +181,12 @@ func Read_TaskData_ById_ClassList(T_task_id string) []TaskDataClass_ {
 	sql := "SELECT DISTINCT t_sn,t_id FROM z_task_data_" + T_task_id + " ORDER BY t_id "
 
 	fmt.Println(sql)
-	o.Raw(sql).QueryRows(&maps)
+	_, err := o.Raw(sql).QueryRows(&maps)
+
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return maps
+	}
 
 	return maps
 }
@@ -231,10 +217,15 @@ func Add_TaskData(T_task_id string, T_sn string, T_id string, T_t string, T_rh s
 	sql := "INSERT INTO z_task_data_" + T_task_id + " (`t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) " +
 		"VALUES ('" + T_sn + "', " + T_id + ", " + T_t + "," + T_rh + ", '" + T_time + "')"
 
+	// 更新数据
+	//sql := "INSERT  INTO z_task_data_" + T_task_id + " (`t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) " +
+	//	"VALUES ('" + T_sn + "', " + T_id + ", " + T_t + "," + T_rh + ", '" + T_time + "')" +
+	//	"on duplicate key update `t_t`=" + T_t + ",`t_rh`=" + T_rh
+
 	fmt.Println(sql)
 	res, err := o.Raw(sql).Exec()
 	if err != nil {
-		fmt.Println(err)
+		logs.Error(lib.FuncName(), err)
 		return false
 	}
 	res.RowsAffected()
@@ -268,7 +259,7 @@ func Up_TaskData(T_task_id string, Id string, T_t string, T_rh string, T_time st
 	//fmt.Println(sql)
 	res, err := o.Raw(sql).Exec()
 	if err != nil {
-		fmt.Println(err)
+		logs.Error(lib.FuncName(), err)
 		return false
 	}
 	res.RowsAffected()
@@ -288,7 +279,7 @@ func Del_TaskData(T_task_id string, Id string) bool {
 	logs.Println(sql)
 	res, err := o.Raw(sql).Exec()
 	if err != nil {
-		fmt.Println(err)
+		logs.Error(lib.FuncName(), err)
 		return false
 	}
 	res.RowsAffected()
@@ -308,7 +299,7 @@ func Del_TaskData_t_id(T_task_id string, Id string) bool {
 	logs.Println(sql)
 	res, err := o.Raw(sql).Exec()
 	if err != nil {
-		fmt.Println(err)
+		logs.Error(lib.FuncName(), err)
 		return false
 	}
 	res.RowsAffected()
@@ -340,72 +331,25 @@ func Check_TaskData_Num(T_task_id string) bool {
 	return true
 }
 
-func Read_TaskData_List(alias_name, T_task_id string, page int, page_z int) []*TaskData_ {
-	o := orm2.NewOrmUsingDB(alias_name)
-	var maps []*TaskData_
-	pagez := page_z
-
-	var offset int
-	if page <= 1 {
-		offset = 0
-	} else {
-		page -= 1
-		offset = page * pagez
-	}
-
-	sql := "SELECT ID,t_sn,t_id,t_t,t_rh,DATE_FORMAT(t_time,'%Y-%c-%d %H:%i:%s') AS t_times FROM z_task_data_" + T_task_id + " LIMIT " + strconv.Itoa(offset) + "," + strconv.Itoa(pagez)
+func Read_TaskData_Num(T_task_id string) int {
+	o1 := orm2.NewOrmUsingDB(conf.Local_AliasName)
 
+	var maps_z []orm2.ParamsList
+	sql := "SELECT COUNT(ID) FROM z_task_data_" + T_task_id
 	fmt.Println(sql)
-	_, err := o.Raw(sql).QueryRows(&maps)
+	_, err := o1.Raw(sql).ValuesList(&maps_z)
 	if err != nil {
-		logs.Println("查询失败 z_task_data_" + T_task_id)
+		logs.Error(lib.FuncName(), err)
+		return 0
 	}
-
-	return maps
-}
-
-func Add_TaskData_List(alias_name, T_task_id string, TaskList []*TaskData_) (int, int) {
-	var wg sync.WaitGroup
-	limiter := make(chan bool, 80)
-	succeeds := 0
-	fails := 0
-	o := orm2.NewOrmUsingDB(alias_name)
-
-	for _, val := range TaskList {
-		wg.Add(1)
-		limiter <- true
-
-		go func(TaskData *TaskData_) {
-			defer wg.Done()
-			//if Add_TaskData(T_task_id, TaskData.T_sn, strconv.Itoa(TaskData.T_id), fmt.Sprintf("%.2f", TaskData.T_t),
-			//	fmt.Sprintf("%.2f", TaskData.T_rh), TaskData.T_time) {
-			//	succeeds++
-			//} else {
-			//	fails++
-			//}
-
-			sql := "INSERT INTO z_task_data_" + T_task_id + " (`t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) " +
-				"VALUES ('" + TaskData.T_sn + "', " + strconv.Itoa(TaskData.T_id) + ", " + fmt.Sprintf("%.2f", TaskData.T_t) + "," +
-				fmt.Sprintf("%.2f", TaskData.T_rh) + ", '" + TaskData.T_time + "')"
-
-			//fmt.Println(sql)
-			_, err := o.Raw(sql).Exec()
-			if err != nil {
-				fails++
-			} else {
-				succeeds++
-			}
-			<-limiter
-		}(val)
-	}
-	wg.Wait()
-	return succeeds, fails
+	key, _ := strconv.Atoi(maps_z[0][0].(string))
+	return key
 }
 
 func Dump_TaskData(T_task_id, root, password, url_port, database, sql_file string) (string, error) {
 	// url_port 127.0.0.1:3306
 	// mysql8.0 以上加 --column-statistics=0
-	// mysqldump --column-statistics=0 -uroot -proot -h127.0.0.1 -P3306 cold_verify Z_TaskData_ixEfo5zk2Oeb> /Data/Z_TaskData_ixEfo5zk2Oeb.sql
+	// mysqldump --column-statistics=0 -uroot -proot -h127.0.0.1 -P3306 cold_verify Z_TaskData_ixEfo5zk2Oeb > /Data/Z_TaskData_ixEfo5zk2Oeb.sql
 	v := Read_Local_Mysql_Version()
 	host_port := strings.Split(url_port, ":")
 	table_name := "z_task_data_" + T_task_id
@@ -414,22 +358,29 @@ func Dump_TaskData(T_task_id, root, password, url_port, database, sql_file strin
 		org += "--column-statistics=0 "
 	}
 	//--no-create-info 只导出数据,而不添加 CREATE TABLE 语句。
-	org = org + fmt.Sprintf("-u%s -p%s -h%s -P%s %s %s> %s --no-create-info",
+	org = org + fmt.Sprintf("-u%s -p%s -h%s -P%s --no-create-info --set-gtid-purged=OFF --quick %s %s > %s ",
 		root, password, host_port[0], host_port[1], database, table_name, sql_file)
 
 	logs.Println(org)
 	_, err := lib.Command(org)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+	}
+
 	return org, err
 }
 
 func Insert_TaskData(root, password, url_port, database, sql_file string) (string, error) {
 	// url_port 127.0.0.1:3306
-	// mysql -uroot -proot -h127.0.0.1 -P3306 cold_verify_local < /data/Z_TaskData_ixEfo5zk2Oeb.sql
+	// mysql -u root -p root -h127.0.0.1 -P3306 cold_verify_local < /data/Z_TaskData_ixEfo5zk2Oeb.sql
 	host_port := strings.Split(url_port, ":")
 	org := fmt.Sprintf("mysql -u%s -p%s -h%s -P%s %s < %s",
 		root, password, host_port[0], host_port[1], database, sql_file)
 	logs.Println(org)
 	_, err := lib.Command(org)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+	}
 	return org, err
 }
 
@@ -440,3 +391,36 @@ func Read_Local_Mysql_Version() int {
 	version, _ := strconv.Atoi(params[0]["version()"].(string)[0:1])
 	return version
 }
+
+func Import_TaskData(T_task_id string, offset, pagez int) bool {
+	var maps []TaskData_
+	serverOrm := orm2.NewOrmUsingDB(conf.Server_AliasName)
+	localOrm := orm2.NewOrmUsingDB(conf.Local_AliasName)
+	tb_name := "z_task_data_" + T_task_id
+
+	// 开从先上版查询数据
+	sql := "select `ID`, `t_sn`, `t_id`, `t_t`, `t_rh`, DATE_FORMAT(t_time,'%Y-%m-%d %H:%i:%s') AS t_times from " + tb_name + " LIMIT " + strconv.Itoa(offset) + "," + strconv.Itoa(pagez)
+	logs.Println(sql)
+	_, err := serverOrm.Raw(sql).QueryRows(&maps)
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return false
+	}
+
+	values := []string{}
+	for _, row := range maps {
+		// 处理数据
+		values = append(values, fmt.Sprintf("('%s',%d,%v,%v,'%s')", row.T_sn, row.T_id, row.T_t, row.T_rh, row.T_time))
+	}
+	// 向本地版插入数据
+	sql = "INSERT IGNORE INTO " + tb_name + "( `t_sn`, `t_id`, `t_t`, `t_rh`, `t_time`) VALUES " + strings.Join(values, ",")
+
+	//logs.Println(sql)
+	_, err = localOrm.Raw(sql).Exec()
+	if err != nil {
+		logs.Error(lib.FuncName(), err)
+		return false
+	}
+
+	return true
+}

+ 2 - 1
routers/TaskData.go

@@ -22,5 +22,6 @@ func init() {
 	beego.Router("/TaskData/Check", &controllers.TaskDataController{}, "*:Check") // 设置 设备参数
 
 	beego.Router("/TaskData/Import_TaskData", &controllers.TaskDataController{}, "*:TaskData_Import_TaskData") // 打包本地数据
-	beego.Router("/TaskData/Up_TaskData", &controllers.TaskDataController{}, "*:TaskData_Up_TaskData")         // 更新线上数据
+	beego.Router("/TaskData/Up_TaskData", &controllers.TaskDataController{}, "*:TaskData_Up_TaskData_Back")    // 更新线上数据
+	//beego.Router("/TaskData/Up_TaskData_Back", &controllers.TaskDataController{}, "*:TaskData_Up_TaskData_Back") // 更新线上数据
 }