|
@@ -0,0 +1,152 @@
|
|
|
+package Nats
|
|
|
+
|
|
|
+import (
|
|
|
+ "ColdVerify_server/Nats/NatsServer"
|
|
|
+ "ColdVerify_server/conf"
|
|
|
+ "ColdVerify_server/lib"
|
|
|
+ "ColdVerify_server/logs"
|
|
|
+ "ColdVerify_server/models/Device"
|
|
|
+ "ColdVerify_server/models/System"
|
|
|
+ "ColdVerify_server/models/Task"
|
|
|
+ "fmt"
|
|
|
+ "github.com/nats-io/nats.go"
|
|
|
+ "github.com/vmihailenco/msgpack/v5"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+
|
|
|
+ logs.Println("============Nats init============")
|
|
|
+ var err error
|
|
|
+ // 连接Nats服务器
|
|
|
+ lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("nats 连接失败!")
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ logs.Println("nats OK!")
|
|
|
+
|
|
|
+ go NatsInit()
|
|
|
+}
|
|
|
+
|
|
|
+type Extract_TaskData_Back struct {
|
|
|
+ T_uuid string `xml:"T_uuid"` // 任务主键id
|
|
|
+ Time_start string `xml:"Time_start"`
|
|
|
+ Time_end string `xml:"Time_end"`
|
|
|
+ DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
|
|
|
+ Task Task.Task `xml:"Task"` // 泛型
|
|
|
+}
|
|
|
+
|
|
|
+func NatsInit() {
|
|
|
+
|
|
|
+ // 发布-订阅 模式,打包数据
|
|
|
+ _, _ = lib.Nats.Subscribe("ColdVerify_Server_Extract_TaskData_Back", func(m *nats.Msg) {
|
|
|
+ logs.Debug("Extract_TaskData_Back message: %v\n", string(m.Data))
|
|
|
+
|
|
|
+ var resp Extract_TaskData_Back
|
|
|
+
|
|
|
+ err := msgpack.Unmarshal(m.Data, &resp)
|
|
|
+ if err != nil {
|
|
|
+ System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ Task_r := resp.Task
|
|
|
+ // 清空表
|
|
|
+ Task.Truncate_TaskData(Task_r.T_task_id)
|
|
|
+
|
|
|
+ DeviceClassList := new(sync.Map)
|
|
|
+ for _, v := range resp.DeviceClassList {
|
|
|
+ DeviceClassList.Store(v.Id, v)
|
|
|
+ }
|
|
|
+
|
|
|
+ DeviceClassList.Range(func(k, v any) bool {
|
|
|
+ c := v.(Device.DeviceClassList)
|
|
|
+ logs.Println("DeviceClass ---- ", c.T_sn, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
|
|
|
+ err = Task.Import_TaskData_Back(c.T_sn, c.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
|
|
|
+ if err == nil {
|
|
|
+ DeviceClassList.Delete(k)
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+
|
|
|
+ TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
|
|
|
+
|
|
|
+ if TaskData_Num == 0 {
|
|
|
+ Task_r.T_collection_state = 0
|
|
|
+ if !Task.Update_Task(Task_r, "T_collection_state") {
|
|
|
+ logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // 导入到本地数据
|
|
|
+ NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
|
|
|
+
|
|
|
+ System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
|
|
|
+
|
|
|
+ System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
|
|
|
+
|
|
|
+ })
|
|
|
+
|
|
|
+ _, _ = lib.Nats.Subscribe("ColdVerify_Server_Update_Task", func(m *nats.Msg) {
|
|
|
+
|
|
|
+ var t_Req Task.Task
|
|
|
+ var t_R lib.JSONS
|
|
|
+ err := msgpack.Unmarshal(m.Data, &t_Req)
|
|
|
+ if err != nil {
|
|
|
+ logs.Error("Mats", lib.FuncName(), err)
|
|
|
+ t_R.Code = 202
|
|
|
+ t_R.Msg = err.Error()
|
|
|
+ b, _ := msgpack.Marshal(&t_R)
|
|
|
+ _ = lib.Nats.Publish(m.Reply, b)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
|
|
|
+
|
|
|
+ col := []string{}
|
|
|
+ if t_Req.T_delivery_state > 0 {
|
|
|
+ col = append(col, "T_delivery_state")
|
|
|
+ }
|
|
|
+
|
|
|
+ if !Task.Update_Task(t_Req, "T_collection_state", "T_delivery_state") {
|
|
|
+ logs.Error("Mats", lib.FuncName(), err)
|
|
|
+ t_R.Code = 202
|
|
|
+ t_R.Msg = err.Error()
|
|
|
+ b, _ := msgpack.Marshal(&t_R)
|
|
|
+ _ = lib.Nats.Publish(m.Reply, b)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ t_R.Code = 200
|
|
|
+ t_R.Msg = "ok"
|
|
|
+
|
|
|
+ b, _ := msgpack.Marshal(&t_R)
|
|
|
+ _ = lib.Nats.Publish(m.Reply, b)
|
|
|
+
|
|
|
+ })
|
|
|
+
|
|
|
+ _, _ = lib.Nats.Subscribe("ColdVerify_Server_Read_Task", func(m *nats.Msg) {
|
|
|
+ fmt.Printf("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
|
|
|
+
|
|
|
+ var t_R lib.JSONS
|
|
|
+ task, is := Task.Read_Task(string(m.Data))
|
|
|
+ if !is {
|
|
|
+ logs.Error("Mats", lib.FuncName())
|
|
|
+ t_R.Code = 202
|
|
|
+ t_R.Msg = "查询失败"
|
|
|
+ b, _ := msgpack.Marshal(&t_R)
|
|
|
+ _ = lib.Nats.Publish(m.Reply, b)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ t_R.Code = 200
|
|
|
+ t_R.Msg = "ok"
|
|
|
+ t_R.Data = task
|
|
|
+
|
|
|
+ b, _ := msgpack.Marshal(&t_R)
|
|
|
+ _ = lib.Nats.Publish(m.Reply, b)
|
|
|
+
|
|
|
+ })
|
|
|
+
|
|
|
+}
|