package Handle import ( "AIOTCOER/conf" "AIOTCOER/lib" "AIOTCOER/logs" "AIOTCOER/models/Device" "AIOTCOER/models/Product" "AIOTCOER/models/StatisticalData" "encoding/hex" "encoding/json" "errors" "fmt" "go.mongodb.org/mongo-driver/bson" "plugin" "reflect" "sync" "time" ) var Mu_PullHandle sync.Mutex // Mutex for synchronizing access to the map // var Mu_PushHandle sync.Mutex // Mutex for synchronizing access to the map // 设备->平台 func PullHandle(Device_r *Device.Device, topicName string, message []byte) error { Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID //Msidstart := time.Now() defer lib.PullHandleTime.TimeCost(time.Now()) //流量统计 go func(num int64) { Mu_PullHandle.Lock() sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID] if !sdis { sd = &StatisticalData.FlowCount_T{} StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd } sd.T_send += num Mu_PullHandle.Unlock() }(int64(len(message))) DeviceRealLogR_ := []string{} // 加入设备日志 类型 Data_log := "" if message[0] == '{' { Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(message) } else { Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(message) // 加入设备日志 } // 加入设备日志 DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志 logs.PrintlnData(Data_log) messagejson := string(message) // 是否加载转换协议 if Device_r.T_ProductJson.T_prot != 0 { // 设备协议 ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot} if !ProductProt_r.Read() { logs.Println(Msid, "MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!") return errors.New("T_prot E!") } // 加载 SO 文件 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { logs.PrintlnError(Msid, "打开 SO 失败:", err) return errors.New("T_analysis E!") } //logs.Println("Plugin 地址:", &p) // 查找库导出信息 s, err := p.Lookup("T") if err != nil { logs.PrintlnError(Msid, "Plugin:", err) return errors.New("Plugin E!") } // 类型转换 messagejson = s.(func(t string, b []byte) string)(topicName, message) // 开始处理 //logs.Println("协议后:", messagejson) DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志 } //logs.Println("首字符:", string(f[0])) if messagejson[0] != '{' { logs.Println(Msid, "data jsonE {}!") return errors.New("data jsonE {}!") } //结构体 var json_r map[string]interface{} err := json.Unmarshal([]byte(messagejson), &json_r) if err != nil { logs.Println(Msid, "data jsonE !") return errors.New("data jsonE!") } // 数据 SSE 实时推送 go lib.SseSubscribeSendAll(Device_r.T_sn, json_r) // 数据 SSELog 实时推送 go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_) // 数据 Websocket 实时推送 go lib.WebsocketSubscribeSendAll(Device_r.T_sn, json_r) // 详细处理 json_r = AnalysisMap(Msid, Device_r, json_r, "") logs.Println("json_r:", json_r) // 合并json Device_r.Read_Tidy() // 提前最新的 //logs.Println(Msid,"Device_r.T_dataJson:", Device_r.T_dataJson) //logs.Println(Msid,"json_r:", json_r) //var json_x map[string]interface{} if Device_r.T_dataJson != nil { // 托管平台 参数 for _, field := range Device_r.T_ProductJson.T_renew { value := lib.JsonGetField(Device_r.T_dataJson, field) if value != nil { //logs.Println(Msid,"JsonSetField:", field,value) lib.JsonSetField(json_r, field, value) } } //logs.Println(Msid,"json_r:", json_r) // 合并 json_str, _ := json.Marshal(json_r) //logs.Println(Msid,"json_x_b:", json_str) json.Unmarshal(json_str, &Device_r.T_dataJson) //json_r = json_x } else { Device_r.T_dataJson = json_r } logs.Println(Msid, "Device_r.T_dataJson_New:", Device_r.T_dataJson) //logs.Println(Msid,"Device_r.T_data 更新:", json_r) //Device_r.T_dataJson = json_r Device_r.Update("T_data") //logs.Println("DeviceRealLogR_:",DeviceRealLogR_) // 更新设备记录日志 //v, is := logs.DeviceRealLogMap[Device_r.T_sn] //if is { // v.Data = append(v.Data, DeviceRealLogR_...) // logs.DeviceRealLogMap[Device_r.T_sn] = v //} return nil } // 平台->设备 func PushHandle(Device_r *Device.Device, topicName string, message string) error { Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID //Msidstart := time.Now() // 流量统计 Mu_PullHandle.Lock() sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID] if !sdis { sd = &StatisticalData.FlowCount_T{} StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd } sd.T_receive += int64(len(message)) Mu_PullHandle.Unlock() // 加入设备日志 DeviceRealLogR_ := []string{} logs.Println(Msid, "PushHandle ", topicName, message) // 加入设备日志 // 设备协议 ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot} if Device_r.T_ProductJson.T_prot != 0 { if !ProductProt_r.Read() { logs.Println(Msid, "PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!") return errors.New("设备协议找不到") } } var byte_r []byte byte_r = []byte(message) var topicName_r = topicName // 是否加载转换协议 if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 { // 根据库的存放路径加载库 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { logs.PrintlnError(Msid, "PushHandle:", err) return err } // 查找库导出信息 s, err := p.Lookup("R") if err != nil { logs.PrintlnError(Msid, "PushHandle:", err) return err } DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志 // 类型转换 topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message) // 开始处理 //logs.Println(Msid,"协议后:", byte_r) } else { topicName_r = "/" + topicName_r } // 无效消息,不用推送 if len(topicName_r) == 0 || len(byte_r) == 0 { logs.Println(Msid, "无效消息,不用推送!", len(topicName_r), len(byte_r)) return errors.New("无效消息,不用推送") } // 长连接 网关 //logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode) ProductMode_r, is := Product.ProductModeMap[Device_r.T_ProductJson.T_mode] if !is { logs.Println(Msid, "没有找到网关!", Device_r.T_ProductJson.T_mode) return errors.New("没有找到网关!") } //初始化插件 FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle") if err != nil { logs.PrintlnError(Msid, "网关 错误!", Device_r.T_ProductJson.T_mode, err) return errors.New("网关 错误!") } // 初始化插件 err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r) if err != nil { logs.PrintlnError(Msid, "网关 推送失败!", Device_r.T_ProductJson.T_mode, err) return errors.New("网关 推送失败!") } // 加入设备日志 类型 Data_log := "" if byte_r[0] == '{' { Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(byte_r) // 加入设备日志 } else { Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(byte_r) // 加入设备日志 } // 加入设备日志 DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志 logs.PrintlnData(Data_log) //// 更新设备记录日志 //v, is := logs.DeviceRealLogMap[Device_r.T_sn] //if is { // v.Data = append(v.Data, DeviceRealLogR_...) // logs.DeviceRealLogMap[Device_r.T_sn] = v //} // 数据 SSELog 实时推送 go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_) // 托管平台 参数 if len(Device_r.T_ProductJson.T_renew) == 0 { return nil // 没有 托管平台 } logs.Println(Msid, "托管平台:", len(Device_r.T_ProductJson.T_renew), Device_r.T_ProductJson.T_renew) var articleSlide map[string]interface{} err = json.Unmarshal([]byte(message), &articleSlide) if err != nil { logs.Println(Msid, "参数 托管平台 json 解析错误!", err.Error()) } Device_r.Read_Tidy() // 提前最新的 for _, field := range Device_r.T_ProductJson.T_renew { value := lib.JsonGetField(articleSlide, field) if value != nil { lib.JsonSetField(Device_r.T_dataJson, field, value) } } logs.Println(Msid, "Device_r.T_data 更新:", Device_r.T_dataJson) Device_r.Update("T_data") return nil } // 处理数据 func AnalysisMap(Msid string, Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) { //JointTab += "." //T_r := make(map[string]interface{}) json_r = make(map[string]interface{}) for key, value := range ArticleSlide { //fmt.Println(reflect.TypeOf(value).String()) switch reflect.TypeOf(value).String() { case "map[string]interface {}": json_r[key] = AnalysisMap(Msid, Device_r, value.(map[string]interface{}), JointTab+key+".") //return json_r break case "[]interface {}": for _, valuex := range value.([]interface{}) { if reflect.TypeOf(valuex).String() == "map[string]interface {}" { json_r[key] = AnalysisMap(Msid, Device_r, valuex.(map[string]interface{}), JointTab+key+".") } } //return json_r break default: json_r[key] = value // 根 TAB if len(JointTab) == 0 { //logs.Println("TAB.:",key) bson_r := bson.M{key: value} // 数据存储 //T_filter := Device_r.T_ProductJson.T_filter[key] if lib.StringExistsInSlice(key, Device_r.T_ProductJson.T_filter) { T_filter_v := lib.GetMapRecursion(key, Device_r.T_dataJson) //logs.Println("T_filter:",key,value,T_filter_v) if value != T_filter_v { logs.Println(Msid, "T_filter 不同:", key) Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r) } } else { Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r) } // 消息转发 Relay(Msid, Device_r, JointTab, &bson_r) } break } } if len(JointTab) == 0 { return } // 多级 TAB JointTab = JointTab[:len(JointTab)-1] //logs.Println("TAB-:",JointTab) bson_r := bson.M{} for key, value := range json_r { //fmt.Println(key, "->:", value) bson_r[key] = value } // 数据存储 if lib.StringExistsInSlice(JointTab, Device_r.T_ProductJson.T_filter) { T_filter_v := lib.GetMapRecursion(JointTab, Device_r.T_dataJson) //logs.Println("T_filter:",JointTab,json_r,T_filter_v) if !reflect.DeepEqual(json_r, T_filter_v) { //logs.Println("T_filter 不同",JointTab) Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r) } } else { Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r) } // 消息转发 Relay(Msid, Device_r, JointTab, &bson_r) return json_r }