|
- package Nats
- import (
- "ColdVerify_server/Nats/NatsServer"
- "ColdVerify_server/conf"
- "ColdVerify_server/lib"
- "ColdVerify_server/logs"
- "ColdVerify_server/models/Account"
- "ColdVerify_server/models/Certificate"
- "ColdVerify_server/models/Device"
- "ColdVerify_server/models/System"
- "ColdVerify_server/models/Task"
- "fmt"
- "github.com/nats-io/nats.go"
- "github.com/vmihailenco/msgpack/v5"
- "strings"
- "sync"
- )
- func init() {
- logs.Println("============Nats init============")
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
- if err != nil {
- logs.Error("nats 连接失败!")
- panic(err)
- }
- logs.Println("nats OK!")
- // 本地测试,屏蔽本地nats
- if !conf.NatsForbidden {
- go NatsInit()
- }
- }
- type Extract_TaskData_Back struct {
- T_uuid string `xml:"T_uuid"` // 任务主键id
- Time_start string `xml:"Time_start"`
- Time_end string `xml:"Time_end"`
- DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型
- Task Task.Task `xml:"Task"` // 泛型
- }
- func NatsInit() {
- // 发布-订阅 模式,打包数据
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
- logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
- var resp Extract_TaskData_Back
- err := msgpack.Unmarshal(m.Data, &resp)
- if err != nil {
- System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
- return
- }
- Task_r := resp.Task
- // 清空表
- Task.Truncate_TaskData(Task_r.T_task_id)
- //失败重试5次
- DeviceClassList := new(sync.Map)
- var count int
- for _, v := range resp.DeviceClassList {
- if strings.Contains(v.T_sn, "-") || len(v.T_sn) == 0 {
- // 从3.0平台导入
- continue
- }
- DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
- //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
- //count++
- //time.Sleep(5 * time.Second)
- }
- DeviceClassList.Range(func(k, v interface{}) bool {
- count++
- return true
- })
- for count > 0 {
- DeviceClassList.Range(func(k, v any) bool {
- T_snid := strings.Split(k.(string), "|")
- T_sn := T_snid[0]
- T_id := T_snid[1]
- temp := v.(int)
- temp--
- DeviceClassList.Store(k, temp)
- err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
- if err == nil || strings.Contains(err.Error(), "doesn't exist") {
- DeviceClassList.Delete(k)
- count--
- } else {
- logs.Error("设备数据同步到任务数据失败", err)
- DeviceClassList.Delete(k)
- count--
- }
- return true
- })
- }
- TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
- //if TaskData_Num == 0 {
- // Task_r.T_collection_num = 0
- // if !Task.Update_Task(Task_r, "T_collection_state") {
- // logs.Error(lib.FuncName(), "后台执行修改任务数据失败")
- // }
- // return
- //}
- // 导入到本地数据
- NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num)
- System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r)
- System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
- var t_Req Task.Task
- var t_R lib.JSONS
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task message: %+v\n", t_Req))
- col := []string{}
- if t_Req.T_delivery_state > 0 {
- col = append(col, "T_delivery_state")
- }
- if t_Req.T_collection_state > 0 {
- col = append(col, "T_collection_state")
- }
- if !Task.Update_Task(t_Req, col...) {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
- logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
- var t_R lib.JSONS
- task, is := Task.Read_Task(string(m.Data))
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = task
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) {
- logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data))
- var t_R lib.JSONS
- task, is := Task.Read_Task(string(m.Data))
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = List
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) {
- logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data))
- var t_R lib.JSONS
- err, user := Account.Read_User_ByT_uuid(string(m.Data))
- if err != nil {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = user
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) {
- type T_Req struct {
- T_task_id string `xml:"T_task_id"` // 任务主键id
- T_sn string `xml:"T_sn"`
- T_id string `xml:"T_id"`
- }
- var t_Req T_Req
- var t_R lib.JSONS
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- logs.Debug(fmt.Sprintf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req))
- Task_r, is := Task.Read_Task(t_Req.T_task_id)
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "T_task_id 错误!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 判断是否已存在sn
- dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
- // 添加的id和数据库已存在id相同
- if is && dc.T_id == t_Req.T_id {
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- var pdf Certificate.CertificatePdf
- //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn)
- pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
- if len(pdfList) > 0 {
- pdf = pdfList[0]
- }
- // 相同sn 添加的id和数据库已存在id不同
- if is && dc.T_id != t_Req.T_id {
- dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
- if is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- dc.T_id = t_Req.T_id
- dc.T_failure_time = pdf.T_failure_time
- dc.T_pdf = pdf.T_pdf
- dc.T_Certificate_sn = pdf.T_Certificate_sn
- if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "修改编号失败!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- } else {
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
- return
- }
- }
- var_ := Device.DeviceClassList{
- T_class: Task_r.T_class,
- T_id: t_Req.T_id,
- T_sn: t_Req.T_sn,
- T_failure_time: pdf.T_failure_time,
- T_pdf: pdf.T_pdf,
- T_Certificate_sn: pdf.T_Certificate_sn,
- T_remark: "",
- T_State: 1,
- }
- _, is = Device.Add_DeviceClassList(var_)
- if !is {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) {
- type T_Req struct {
- T_task_id string `xml:"T_task_id"` // 任务主键id
- T_sn string `xml:"T_sn"`
- T_id string `xml:"T_id"`
- }
- var t_Req T_Req
- var t_R lib.JSONS
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- logs.Debug(fmt.Sprintf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req))
- Task_r, is := Task.Read_Task(t_Req.T_task_id)
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "T_task_id 错误!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 判断是否已存在sn
- dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn)
- // 添加的id和数据库已存在id相同
- if is && dc.T_id == t_Req.T_id {
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 相同sn 添加的id和数据库已存在id不同
- if is && dc.T_id != t_Req.T_id {
- dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id)
- if is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn)
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- var pdf Certificate.CertificatePdf
- pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "")
- if len(pdfList) > 0 {
- pdf = pdfList[0]
- }
- dc.T_id = t_Req.T_id
- dc.T_failure_time = pdf.T_failure_time
- dc.T_pdf = pdf.T_pdf
- dc.T_Certificate_sn = pdf.T_Certificate_sn
- if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "修改编号失败!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Del_TaskData_ByT_BindDeviceDataTime", "Del_TaskData_ByT_BindDeviceDataTime", func(m *nats.Msg) {
- var t_R lib.JSONS
- task, is := Task.Read_Task(string(m.Data))
- if !is {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- dcList, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", "", 0, 9999)
- for _, v := range dcList {
- if !lib.IsNumeric(v.T_id) {
- // 删除任务表指定时间数据
- Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_BindDeviceDataStartTime, task.T_BindDeviceDataEndTime)
- } else {
- Task.Del_TaskData_t_idByT_BindDeviceDataTime(task.T_task_id, v.T_id, task.T_VerifyDeviceDataStartTime, task.T_VerifyDeviceDataEndTime)
- }
- }
- })
- _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task_BySN", "Update_Task_BySN", func(m *nats.Msg) {
- type T_Req struct {
- T_sn string `xml:"T_sn"`
- T_CalibrationTime string `xml:"T_CalibrationTime"`
- }
- var t_Req T_Req
- var t_R lib.JSONS
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- logs.Debug(fmt.Sprintf("ColdVerify_Server_Update_Task_BySN message: %+v\n", t_Req))
- task, err := Task.Read_Task_BySN(t_Req.T_sn)
- if err != nil && err.Error() != "record not found" {
- logs.Error("Mats", lib.FuncName())
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- col := []string{}
- if len(t_Req.T_CalibrationTime) > 0 {
- task.T_CalibrationExpirationTime = t_Req.T_CalibrationTime[0:10]
- col = append(col, "T_CalibrationExpirationTime")
- }
- if !Task.Update_Task(task, col...) {
- logs.Error("Mats", lib.FuncName(), err)
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- Task.Add_TaskLogs_T("nats", task.T_task_id, "任务管理", "修改校准时间", t_Req)
- System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req)
- })
- }
|