package Nats import ( "ColdVerify_server/Nats/NatsServer" "ColdVerify_server/conf" "ColdVerify_server/lib" "ColdVerify_server/logs" "ColdVerify_server/models/Account" "ColdVerify_server/models/Certificate" "ColdVerify_server/models/Device" "ColdVerify_server/models/System" "ColdVerify_server/models/Task" "fmt" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "strings" "sync" ) func init() { logs.Println("============Nats init============") var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url) if err != nil { fmt.Println("nats 连接失败!") panic(err) } logs.Println("nats OK!") // 本地测试,屏蔽本地nats if !conf.NatsForbidden { go NatsInit() } } type Extract_TaskData_Back struct { T_uuid string `xml:"T_uuid"` // 任务主键id Time_start string `xml:"Time_start"` Time_end string `xml:"Time_end"` DeviceClassList []Device.DeviceClassList `xml:"DeviceClassList"` // 泛型 Task Task.Task `xml:"Task"` // 泛型 } func NatsInit() { // 发布-订阅 模式,打包数据 _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) { logs.Debug("Extract_TaskData_Back message: \n", string(m.Data)) var resp Extract_TaskData_Back err := msgpack.Unmarshal(m.Data, &resp) if err != nil { System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data)) return } Task_r := resp.Task // 清空表 Task.Truncate_TaskData(Task_r.T_task_id) //失败重试5次 DeviceClassList := new(sync.Map) var count int for _, v := range resp.DeviceClassList { if strings.Contains(v.T_sn, "-") || len(v.T_sn) == 0 { // 从3.0平台导入 continue } DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5) //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end) //count++ //time.Sleep(5 * time.Second) } DeviceClassList.Range(func(k, v interface{}) bool { count++ return true }) for count > 0 { DeviceClassList.Range(func(k, v any) bool { T_snid := strings.Split(k.(string), "|") T_sn := T_snid[0] T_id := T_snid[1] temp := v.(int) temp-- DeviceClassList.Store(k, temp) err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end) if err == nil || strings.Contains(err.Error(), "doesn't exist") { DeviceClassList.Delete(k) count-- } else { logs.Error("设备数据同步到任务数据失败", err) DeviceClassList.Delete(k) count-- } return true }) } TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id) //if TaskData_Num == 0 { // Task_r.T_collection_num = 0 // if !Task.Update_Task(Task_r, "T_collection_state") { // logs.Error(lib.FuncName(), "后台执行修改任务数据失败") // } // return //} // 导入到本地数据 NatsServer.Import_TaskData(resp.T_uuid, Task_r.T_task_id, TaskData_Num) System.Add_UserLogs_T(resp.T_uuid, "任务", "修改", Task_r) System.Add_UserLogs(resp.T_uuid, "提取数据", "提取数据"+Task_r.T_name, Task_r.T_task_id+"|"+resp.Time_start+"|"+resp.Time_end) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) { var t_Req Task.Task var t_R lib.JSONS err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { logs.Error("Mats", lib.FuncName(), err) t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } fmt.Printf("ColdVerify_Server_Update_Task message: %+v\n", t_Req) col := []string{} if t_Req.T_delivery_state > 0 { col = append(col, "T_delivery_state") } if !Task.Update_Task(t_Req, col...) { logs.Error("Mats", lib.FuncName(), err) t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req) System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) { logs.Println("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data)) var t_R lib.JSONS task, is := Task.Read_Task(string(m.Data)) if !is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = task b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Device_Class_List", "Device_Class_List", func(m *nats.Msg) { logs.Println("ColdVerify_Server_Device_Class_List message: %+v\n", string(m.Data)) var t_R lib.JSONS task, is := Task.Read_Task(string(m.Data)) if !is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } List, _ := Device.Read_DeviceClassList_OrderList(task.T_class, "", "", 0, 9999) t_R.Code = 200 t_R.Msg = "ok" t_R.Data = List b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_User", "Read_User", func(m *nats.Msg) { logs.Println("ColdVerify_Server_Read_User message: %+v\n", string(m.Data)) var t_R lib.JSONS err, user := Account.Read_User_ByT_uuid(string(m.Data)) if err != nil { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = user b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Add_DeviceClassList", "Add_DeviceClassList", func(m *nats.Msg) { type T_Req struct { T_task_id string `xml:"T_task_id"` // 任务主键id T_sn string `xml:"T_sn"` T_id string `xml:"T_id"` } var t_Req T_Req var t_R lib.JSONS err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { logs.Error("Mats", lib.FuncName(), err) t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } fmt.Printf("ColdVerify_Server_Add_DeviceClassList message: %+v\n", t_Req) Task_r, is := Task.Read_Task(t_Req.T_task_id) if !is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "T_task_id 错误!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 判断是否已存在sn dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn) // 添加的id和数据库已存在id相同 if is && dc.T_id == t_Req.T_id { t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } var pdf Certificate.CertificatePdf //pdfList, _ := Certificate.Read_CertificatePdf_Newest(T_sn) pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "") if len(pdfList) > 0 { pdf = pdfList[0] } // 相同sn 添加的id和数据库已存在id不同 if is && dc.T_id != t_Req.T_id { dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id) if is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } dc.T_id = t_Req.T_id dc.T_failure_time = pdf.T_failure_time dc.T_pdf = pdf.T_pdf dc.T_Certificate_sn = pdf.T_Certificate_sn if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "修改编号失败!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } else { t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req) System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req) return } } var_ := Device.DeviceClassList{ T_class: Task_r.T_class, T_id: t_Req.T_id, T_sn: t_Req.T_sn, T_failure_time: pdf.T_failure_time, T_pdf: pdf.T_pdf, T_Certificate_sn: pdf.T_Certificate_sn, T_remark: "", T_State: 1, } _, is = Device.Add_DeviceClassList(var_) if !is { logs.Error("Mats", lib.FuncName(), err) t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改", t_Req) System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req) }) _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Edit_DeviceClassList", "Edit_DeviceClassList", func(m *nats.Msg) { type T_Req struct { T_task_id string `xml:"T_task_id"` // 任务主键id T_sn string `xml:"T_sn"` T_id string `xml:"T_id"` } var t_Req T_Req var t_R lib.JSONS err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { logs.Error("Mats", lib.FuncName(), err) t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } fmt.Printf("ColdVerify_Server_Edit_DeviceClassList message: %+v\n", t_Req) Task_r, is := Task.Read_Task(t_Req.T_task_id) if !is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "T_task_id 错误!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 判断是否已存在sn dc, is := Device.Read_DeviceClassList_T_class_T_sn(Task_r.T_class, t_Req.T_sn) // 添加的id和数据库已存在id相同 if is && dc.T_id == t_Req.T_id { t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 相同sn 添加的id和数据库已存在id不同 if is && dc.T_id != t_Req.T_id { dc2, is := Device.Read_DeviceClassList_T_class_T_id(Task_r.T_class, t_Req.T_id) if is { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = fmt.Sprintf("编号[%s]已被[%s]关联,请重试", t_Req.T_id, dc2.T_sn) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } var pdf Certificate.CertificatePdf pdfList, _ := Certificate.Read_CertificatePdf_T_layout_no(t_Req.T_id, "") if len(pdfList) > 0 { pdf = pdfList[0] } dc.T_id = t_Req.T_id dc.T_failure_time = pdf.T_failure_time dc.T_pdf = pdf.T_pdf dc.T_Certificate_sn = pdf.T_Certificate_sn if !Device.Update_DeviceClassList(dc, "T_id", "T_failure_time", "T_pdf", "T_Certificate_sn") { logs.Error("Mats", lib.FuncName()) t_R.Code = 202 t_R.Msg = "修改编号失败!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } } t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) Task.Add_TaskLogs_T("nats", t_Req.T_task_id, "任务管理", "修改设备列表", t_Req) System.Add_UserLogs_T("nats", "任务管理", "修改", t_Req) }) }