package Handle import ( "Yunlot/conf" "Yunlot/logs" "Yunlot/models/Device" "Yunlot/models/Product" "encoding/hex" "encoding/json" "errors" "fmt" "go.mongodb.org/mongo-driver/bson" "plugin" "reflect" "time" ) // 设备->平台 func PullHandle(Device_r *Device.Device, topicName string, message []byte) error { DeviceRealLogR_ := []string{} // 设备协议 ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot} if !ProductProt_r.Read() { logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!") return errors.New("T_prot E!") } // 加入设备日志 类型 if message[0] == '{' { DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(message)) // 加入设备日志 } else { DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(message)) // 加入设备日志 } messagejson := string(message) // 是否加载转换协议 if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 { // 加载 SO 文件 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { logs.PrintlnError("打开 SO 失败:", err) return errors.New("T_analysis E!") } logs.Println("Plugin 地址:", &p) // 查找库导出信息 s, err := p.Lookup("T") if err != nil { logs.PrintlnError("Plugin:", err) return errors.New("Plugin E!") } // 类型转换 messagejson = s.(func(t string, b []byte) string)(topicName, message) // 开始处理 logs.Println("协议后:", messagejson) DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志 } //logs.Println("首字符:", string(f[0])) if messagejson[0] == '{' { //结构体 var json_r map[string]interface{} err := json.Unmarshal([]byte(messagejson), &json_r) if err != nil { return errors.New("json E!") } json_r = AnalysisMap(Device_r, json_r, "") logs.Println("json_r:", json_r) // 合并json Device_r.Read_Tidy() // 提前最新的 logs.Println("Device_r.T_data:", Device_r.T_data) var json_x map[string]interface{} if len(Device_r.T_data) > 5 { json.Unmarshal([]byte(Device_r.T_data), &json_x) json_x_b, _ := json.Marshal(json_r) json.Unmarshal(json_x_b, &json_x) json_r = json_x } logs.Println("Device_r.T_data 更新:", json_r) Device_r.T_dataJson = json_r Device_r.UpdateTime.NowDbTime() Device_r.Update("T_data", "UpdateTime") } // //else { // //列表 // var json_lr []map[string]interface{} // err := json.Unmarshal([]byte(messagejson), &json_lr) // if err != nil { // Rt_r.Code = 204 // Rt_r.Msg = "[]json E!" // return Rt_r // } // for _, value := range json_lr { // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "") // } // // // 合并json // Device_r.Read_Tidy() // 提前最新的 // logs.Println("Device_r.T_data:", Device_r.T_data) // var json_x map[string]interface{} // if len(Device_r.T_data) > 5 { // json.Unmarshal([]byte(Device_r.T_data), &json_x) // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1]) // json.Unmarshal(json_x_b, &json_x) // json_lr[len(json_lr)-1] = json_x // } // // Device_r.T_dataJson = json_lr[len(json_lr)-1] // Device_r.UpdateTime.NowDbTime() // Device_r.Update("T_data", "UpdateTime") //} //logs.Println("DeviceRealLogR_:",DeviceRealLogR_) // 更新设备记录日志 v, is := logs.DeviceRealLogMap[Device_r.T_sn] if is { v.Data = append(v.Data, DeviceRealLogR_...) logs.DeviceRealLogMap[Device_r.T_sn] = v } return nil } // 平台->设备 func PushHandle(Device_r *Device.Device, topicName string, message string) error { DeviceRealLogR_ := []string{} // 设备协议 ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot} if !ProductProt_r.Read() { logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!") return errors.New("设备协议找不到") } var byte_r []byte var topicName_r = topicName // 是否加载转换协议 if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 { // 根据库的存放路径加载库 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { logs.PrintlnError("PushHandle:", err) return err } // 查找库导出信息 s, err := p.Lookup("R") if err != nil { logs.PrintlnError("PushHandle:", err) return err } DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志 // 类型转换 topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message) // 开始处理 logs.Println("协议后:", byte_r) } // 无效消息,不用推送 if len(topicName_r) == 0 || len(byte_r) == 0 { logs.Println("无效消息,不用推送!", len(topicName_r), len(byte_r)) return errors.New("无效消息,不用推送") } // 长连接 网关 logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode) ProductMode_r, is := Product.ProductModeMap[ProductProt_r.T_mode] if !is { logs.Println("没有找到网关!", ProductProt_r.T_mode) return errors.New("没有找到网关!") } //初始化插件 FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle") if err != nil { logs.Println("网关 错误!", ProductProt_r.T_mode, err) return errors.New("网关 错误!") } // 初始化插件 err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r) if err != nil { logs.Println("网关 推送失败!", ProductProt_r.T_mode, err) return errors.New("网关 推送失败!") } if byte_r[0] == '{' { DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(byte_r)) // 加入设备日志 } else { DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(byte_r)) // 加入设备日志 } // 更新设备记录日志 v, is := logs.DeviceRealLogMap[Device_r.T_sn] if is { v.Data = append(v.Data, DeviceRealLogR_...) logs.DeviceRealLogMap[Device_r.T_sn] = v } return nil } // 处理数据 func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) { //JointTab += "." T_r := make(map[string]interface{}) json_r = make(map[string]interface{}) for key, value := range ArticleSlide { //fmt.Println(reflect.TypeOf(value).String()) switch reflect.TypeOf(value).String() { case "map[string]interface {}": json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".") return json_r break case "[]interface {}": for _, valuex := range value.([]interface{}) { if reflect.TypeOf(valuex).String() == "map[string]interface {}" { json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".") } } return json_r break default: T_r[key] = value // 根 TAB if len(JointTab) == 0 { //logs.Println("TAB.:",key) bson_r := bson.M{key: value} go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r) // 消息转发 go Relay(Device_r, JointTab, &bson_r) } break } } if len(JointTab) == 0 { return } // 多级 TAB JointTab = JointTab[:len(JointTab)-1] //logs.Println("TAB-:",JointTab) bson_r := bson.M{} for key, value := range T_r { fmt.Println(key, "->:", value) bson_r[key] = value json_r[key] = value } // 数据存储 go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r) // 消息转发 go Relay(Device_r, JointTab, &bson_r) return json_r }