|
@@ -41,8 +41,8 @@ type Extract_TaskData_Back struct {
|
|
|
func NatsInit() {
|
|
|
|
|
|
// 发布-订阅 模式,打包数据
|
|
|
- _, _ = lib.Nats.Subscribe("ColdVerify_Server_Extract_TaskData_Back", func(m *nats.Msg) {
|
|
|
- logs.Debug("Extract_TaskData_Back message: %v\n", string(m.Data))
|
|
|
+ _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Extract_TaskData_Back", "Extract_TaskData", func(m *nats.Msg) {
|
|
|
+ logs.Debug("Extract_TaskData_Back message: \n", string(m.Data))
|
|
|
|
|
|
var resp Extract_TaskData_Back
|
|
|
|
|
@@ -56,27 +56,32 @@ func NatsInit() {
|
|
|
// 清空表
|
|
|
Task.Truncate_TaskData(Task_r.T_task_id)
|
|
|
|
|
|
- // 失败重试5次
|
|
|
+ //失败重试5次
|
|
|
DeviceClassList := new(sync.Map)
|
|
|
+ var count int
|
|
|
for _, v := range resp.DeviceClassList {
|
|
|
DeviceClassList.Store(fmt.Sprintf("%s|%s", v.T_sn, v.T_id), 5)
|
|
|
+ //err = Task.Import_TaskData_Back(v.T_sn, v.T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
|
|
|
+ count++
|
|
|
+ //time.Sleep(5 * time.Second)
|
|
|
}
|
|
|
-
|
|
|
- DeviceClassList.Range(func(k, v any) bool {
|
|
|
- T_snid := strings.Split(k.(string), "|")
|
|
|
- T_sn := T_snid[0]
|
|
|
- T_id := T_snid[1]
|
|
|
- temp := v.(int)
|
|
|
- temp--
|
|
|
- DeviceClassList.Store(k, temp)
|
|
|
-
|
|
|
- err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
|
|
|
- if err == nil || strings.Contains(err.Error(), "doesn't exist") {
|
|
|
- DeviceClassList.Delete(k)
|
|
|
+ for count > 0 {
|
|
|
+ DeviceClassList.Range(func(k, v any) bool {
|
|
|
+ T_snid := strings.Split(k.(string), "|")
|
|
|
+ T_sn := T_snid[0]
|
|
|
+ T_id := T_snid[1]
|
|
|
+ temp := v.(int)
|
|
|
+ temp--
|
|
|
+ DeviceClassList.Store(k, temp)
|
|
|
+
|
|
|
+ err = Task.Import_TaskData_Back(T_sn, T_id, resp.Task.T_task_id, resp.Time_start, resp.Time_end)
|
|
|
+ if err == nil || strings.Contains(err.Error(), "doesn't exist") {
|
|
|
+ DeviceClassList.Delete(k)
|
|
|
+ count--
|
|
|
+ }
|
|
|
return true
|
|
|
- }
|
|
|
- return true
|
|
|
- })
|
|
|
+ })
|
|
|
+ }
|
|
|
|
|
|
TaskData_Num := Task.Read_TaskData_Count(Task_r.T_task_id)
|
|
|
|
|
@@ -97,7 +102,7 @@ func NatsInit() {
|
|
|
|
|
|
})
|
|
|
|
|
|
- _, _ = lib.Nats.Subscribe("ColdVerify_Server_Update_Task", func(m *nats.Msg) {
|
|
|
+ _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Update_Task", "Update_Task", func(m *nats.Msg) {
|
|
|
|
|
|
var t_Req Task.Task
|
|
|
var t_R lib.JSONS
|
|
@@ -134,7 +139,7 @@ func NatsInit() {
|
|
|
|
|
|
})
|
|
|
|
|
|
- _, _ = lib.Nats.Subscribe("ColdVerify_Server_Read_Task", func(m *nats.Msg) {
|
|
|
+ _, _ = lib.Nats.QueueSubscribe("ColdVerify_Server_Read_Task", "Read_Task", func(m *nats.Msg) {
|
|
|
fmt.Printf("ColdVerify_Server_Read_Task message: %+v\n", string(m.Data))
|
|
|
|
|
|
var t_R lib.JSONS
|