123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package Nats
- import (
- "ColdVerify_server/Nats/NatsServer"
- "ColdVerify_server/conf"
- "ColdVerify_server/lib"
- "ColdVerify_server/logs"
- "ColdVerify_server/models/Device"
- "ColdVerify_server/models/System"
- "ColdVerify_server/models/Task"
- "fmt"
- "github.com/nats-io/nats.go"
- "github.com/vmihailenco/msgpack/v5"
- "strings"
- "sync"
- )
- func init() {
- logs.Println("============Nats init============")
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
- if err != nil {
- fmt.Println("nats 连接失败!")
- panic(err)
- }
- logs.Println("nats OK!")
- // 本地测试,屏蔽本地nats
- if !conf.NatsForbidden {
- go NatsInit()
- }
- }
- type Extract_TaskData_Back struct {
- T_uuid string `xml:"T_uuid"` // 任务主键id
- Time_start string `xml:"Time_start"`
- Time_end string `xml:"Time_end"`
- DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
- Task Task.Task `xml:"Task"` // 泛型
- }
- func NatsInit() {
- // 发布-订阅 模式,打包数据
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
- logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
- var resp Extract_TaskData_Back
- err := msgpack.Unmarshal(m.Data, &resp)
- if err != nil {
- System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
- return
- }
- Task_r := resp.Task
- // 清空表
- Task.Truncate_TaskData(Task_r.T_task_id)
- //失败重试5次
- DeviceClassList := new(sync.Map)
- var count int
- for _, v := range resp.DeviceClassList {
- DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
- //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
- count++
- //time.Sleep(5 * time.Second)
- }
- for count > 0 {
- DeviceClassList.Range(func(k, v any) bool {
- T_snid := strings.Split(k.(string), "|")
- T_sn := T_snid[0]
- T_id := T_snid[1]
- temp := v.(int)
- temp--
- DeviceClassList.Store(k, temp)
- err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
- if err == nil || strings.Contains(err.Error(), "doesn't exist") {
- DeviceClassList.Delete(k)
- count--
- }
- return true
- })
- }
- TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
- //if TaskData_Num == 0 {
- // Task_r.T_collection_num = 0
- // if !Task.Update_Task(Task_r, "T_collection_state") {
- // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
- // }
- // return
- //}
- // 导入到本地数据
- NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
- System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
- System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
- var t_Req Task.Task
- var t_R lib.JSONS
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req)
- col := []string{}
- if t_Req.T_delivery_state > 0 {
- col = append(col, "T_delivery_state")
- }
- if !Task.Update_Task(t_Req, col...) {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_R)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_R)
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
- logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
- var t_R lib.JSONS
- task, is := Task.Read_Task(string(m.Data))
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = task
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- }
|