|
@@ -30,13 +30,12 @@ func init() {
|
|
|
|
|
|
nats.RetryOnFailedConnect(true)
|
|
|
if err != nil {
|
|
|
- fmt.Println("nats 连接失败!")
|
|
|
- panic(err)
|
|
|
+ logs.Error("nats 断开连接", err.Error())
|
|
|
}
|
|
|
logs.Println("nats OK!")
|
|
|
|
|
|
// 本地测试,屏蔽本地nats
|
|
|
- if !conf.NatsForbidden {
|
|
|
+ if !conf.NatsForbidden && lib.Nats != nil {
|
|
|
go NatsInit()
|
|
|
}
|
|
|
}
|
|
@@ -49,7 +48,7 @@ func ReconnectNATS() {
|
|
|
time.Sleep(1 * time.Minute)
|
|
|
//time.Sleep(1 * time.Minute)
|
|
|
|
|
|
- if !lib.Nats.IsConnected() {
|
|
|
+ if (lib.Nats != nil && !lib.Nats.IsConnected()) || lib.Nats == nil {
|
|
|
lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
|
|
|
if err != nil {
|
|
|
logs.Println("无法重新连接到 NATS:", err)
|
|
@@ -175,53 +174,6 @@ func NatsInit() {
|
|
|
|
|
|
})
|
|
|
|
|
|
- //_, _ = lib.Nats.Subscribe("ColdVerify_Local_Import_TaskData2", func(m *nats.Msg) {
|
|
|
- // type T_R struct {
|
|
|
- // T_uuid string `xml:"T_uuid"`
|
|
|
- // T_task_id string `xml:"T_task_id"`
|
|
|
- // TaskData_Num int `xml:"TaskData_Num"`
|
|
|
- // }
|
|
|
- // var t_r T_R
|
|
|
- // err := msgpack.Unmarshal(m.Data, &t_r)
|
|
|
- // if err != nil {
|
|
|
- // System.Add_Logs("Nats", "msgpack Unmarshal err", string(m.Data))
|
|
|
- // return
|
|
|
- // }
|
|
|
- // logs.Debug("ColdVerify_Local_Import_TaskData message: %+v\n", t_r)
|
|
|
- //
|
|
|
- // T_uuid, T_task_id := t_r.T_uuid, t_r.T_task_id
|
|
|
- // Task.Truncate_TaskData(conf.Local_AliasName, T_task_id)
|
|
|
- //
|
|
|
- // var limit = 50000
|
|
|
- // var offset = 0
|
|
|
- // var limitMaxNum = 10
|
|
|
- // var chData = make(chan int, limitMaxNum)
|
|
|
- // var jobGroup sync.WaitGroup
|
|
|
- // var tasknum = t_r.TaskData_Num / limit
|
|
|
- // if t_r.TaskData_Num%limit != 0 {
|
|
|
- // tasknum += 1
|
|
|
- // }
|
|
|
- //
|
|
|
- // //处理任务,最多同时有10个协程
|
|
|
- // for i := 0; i < tasknum; i++ {
|
|
|
- // chData <- 1
|
|
|
- // go func(index int) {
|
|
|
- // defer jobGroup.Done()
|
|
|
- // jobGroup.Add(1)
|
|
|
- // is := Task.Import_TaskData(T_task_id, limit*index, limit)
|
|
|
- // if !is {
|
|
|
- // logs.Error(lib.FuncName(), "导入数据失败", fmt.Sprintf("%s:%d-%d", T_task_id, offset, limit))
|
|
|
- // }
|
|
|
- // <-chData
|
|
|
- // }(i)
|
|
|
- // }
|
|
|
- // //使用Wait等待所有任务执行完毕
|
|
|
- // jobGroup.Wait()
|
|
|
- //
|
|
|
- // System.Add_UserLogs(T_uuid, "任务数据-打包本地数据", "从线上导入数据Z_TaskData_"+T_task_id, "")
|
|
|
- //
|
|
|
- //})
|
|
|
-
|
|
|
// 更新线上数据
|
|
|
_, _ = lib.Nats.QueueSubscribe("ColdVerify_Local_Up_TaskData", "Up_TaskData", func(m *nats.Msg) {
|
|
|
logs.Debug("ColdVerify_Local_Up_TaskData message: ", string(m.Data))
|