123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- package Handle
- import (
- "AIOTCOER/conf"
- "AIOTCOER/lib"
- "AIOTCOER/logs"
- "AIOTCOER/models/Device"
- "AIOTCOER/models/Product"
- "AIOTCOER/models/StatisticalData"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "plugin"
- "reflect"
- "sync"
- "time"
- )
- var Mu_PullHandle sync.Mutex // Mutex for synchronizing access to the map
- // var Mu_PushHandle sync.Mutex // Mutex for synchronizing access to the map
- // 设备->平台
- func PullHandle(Device_r *Device.Device, topicName string, message []byte) error {
- Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID
- //Msidstart := time.Now()
- defer lib.PullHandleTime.TimeCost(time.Now())
- //流量统计
- go func(num int64) {
- Mu_PullHandle.Lock()
- sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID]
- if !sdis {
- sd = &StatisticalData.FlowCount_T{}
- StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd
- }
- sd.T_send += num
- Mu_PullHandle.Unlock()
- }(int64(len(message)))
- DeviceRealLogR_ := []string{}
- // 加入设备日志 类型
- Data_log := ""
- if message[0] == '{' {
- Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(message)
- } else {
- Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(message) // 加入设备日志
- }
- // 加入设备日志
- DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志
- logs.PrintlnData(Data_log)
- messagejson := string(message)
- // 是否加载转换协议
- if Device_r.T_ProductJson.T_prot != 0 {
- // 设备协议
- ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
- if !ProductProt_r.Read() {
- logs.Println(Msid, "MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
- return errors.New("T_prot E!")
- }
- // 加载 SO 文件
- p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- if err != nil {
- logs.PrintlnError(Msid, "打开 SO 失败:", err)
- return errors.New("T_analysis E!")
- }
- //logs.Println("Plugin 地址:", &p)
- // 查找库导出信息
- s, err := p.Lookup("T")
- if err != nil {
- logs.PrintlnError(Msid, "Plugin:", err)
- return errors.New("Plugin E!")
- }
- // 类型转换
- messagejson = s.(func(t string, b []byte) string)(topicName, message)
- // 开始处理
- //logs.Println("协议后:", messagejson)
- DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志
- }
- //logs.Println("首字符:", string(f[0]))
- if messagejson[0] != '{' {
- logs.Println(Msid, "data jsonE {}!")
- return errors.New("data jsonE {}!")
- }
- //结构体
- var json_r map[string]interface{}
- err := json.Unmarshal([]byte(messagejson), &json_r)
- if err != nil {
- logs.Println(Msid, "data jsonE !")
- return errors.New("data jsonE!")
- }
- // 数据 SSE 实时推送
- go lib.SseSubscribeSendAll(Device_r.T_sn, json_r)
- // 数据 SSELog 实时推送
- go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_)
- // 数据 Websocket 实时推送
- go lib.WebsocketSubscribeSendAll(Device_r.T_sn, json_r)
- // 详细处理
- json_r = AnalysisMap(Msid, Device_r, json_r, "")
- logs.Println("json_r:", json_r)
- // 合并json
- Device_r.Read_Tidy() // 提前最新的
- //logs.Println(Msid,"Device_r.T_dataJson:", Device_r.T_dataJson)
- //logs.Println(Msid,"json_r:", json_r)
- //var json_x map[string]interface{}
- if Device_r.T_dataJson != nil {
- // 托管平台 参数
- for _, field := range Device_r.T_ProductJson.T_renew {
- value := lib.JsonGetField(Device_r.T_dataJson, field)
- if value != nil {
- //logs.Println(Msid,"JsonSetField:", field,value)
- lib.JsonSetField(json_r, field, value)
- }
- }
- //logs.Println(Msid,"json_r:", json_r)
- // 合并
- json_str, _ := json.Marshal(json_r)
- //logs.Println(Msid,"json_x_b:", json_str)
- json.Unmarshal(json_str, &Device_r.T_dataJson)
- //json_r = json_x
- } else {
- Device_r.T_dataJson = json_r
- }
- logs.Println(Msid, "Device_r.T_dataJson_New:", Device_r.T_dataJson)
- //logs.Println(Msid,"Device_r.T_data 更新:", json_r)
- //Device_r.T_dataJson = json_r
- Device_r.Update("T_data")
- //logs.Println("DeviceRealLogR_:",DeviceRealLogR_)
- // 更新设备记录日志
- //v, is := logs.DeviceRealLogMap[Device_r.T_sn]
- //if is {
- // v.Data = append(v.Data, DeviceRealLogR_...)
- // logs.DeviceRealLogMap[Device_r.T_sn] = v
- //}
- return nil
- }
- // 平台->设备
- func PushHandle(Device_r *Device.Device, topicName string, message string) error {
- Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID
- //Msidstart := time.Now()
- // 流量统计
- Mu_PullHandle.Lock()
- sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID]
- if !sdis {
- sd = &StatisticalData.FlowCount_T{}
- StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd
- }
- sd.T_receive += int64(len(message))
- Mu_PullHandle.Unlock()
- // 加入设备日志
- DeviceRealLogR_ := []string{}
- logs.Println(Msid, "PushHandle ", topicName, message) // 加入设备日志
- // 设备协议
- ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
- if Device_r.T_ProductJson.T_prot != 0 {
- if !ProductProt_r.Read() {
- logs.Println(Msid, "PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
- return errors.New("设备协议找不到")
- }
- }
- var byte_r []byte
- byte_r = []byte(message)
- var topicName_r = topicName
- // 是否加载转换协议
- if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
- // 根据库的存放路径加载库
- p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- if err != nil {
- logs.PrintlnError(Msid, "PushHandle:", err)
- return err
- }
- // 查找库导出信息
- s, err := p.Lookup("R")
- if err != nil {
- logs.PrintlnError(Msid, "PushHandle:", err)
- return err
- }
- DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志
- // 类型转换
- topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
- // 开始处理
- //logs.Println(Msid,"协议后:", byte_r)
- } else {
- topicName_r = "/" + topicName_r
- }
- // 无效消息,不用推送
- if len(topicName_r) == 0 || len(byte_r) == 0 {
- logs.Println(Msid, "无效消息,不用推送!", len(topicName_r), len(byte_r))
- return errors.New("无效消息,不用推送")
- }
- // 长连接 网关
- //logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode)
- ProductMode_r, is := Product.ProductModeMap[Device_r.T_ProductJson.T_mode]
- if !is {
- logs.Println(Msid, "没有找到网关!", Device_r.T_ProductJson.T_mode)
- return errors.New("没有找到网关!")
- }
- //初始化插件
- FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle")
- if err != nil {
- logs.PrintlnError(Msid, "网关 错误!", Device_r.T_ProductJson.T_mode, err)
- return errors.New("网关 错误!")
- }
- // 初始化插件
- err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r)
- if err != nil {
- logs.PrintlnError(Msid, "网关 推送失败!", Device_r.T_ProductJson.T_mode, err)
- return errors.New("网关 推送失败!")
- }
- // 加入设备日志 类型
- Data_log := ""
- if byte_r[0] == '{' {
- Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(byte_r) // 加入设备日志
- } else {
- Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(byte_r) // 加入设备日志
- }
- // 加入设备日志
- DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志
- logs.PrintlnData(Data_log)
- //// 更新设备记录日志
- //v, is := logs.DeviceRealLogMap[Device_r.T_sn]
- //if is {
- // v.Data = append(v.Data, DeviceRealLogR_...)
- // logs.DeviceRealLogMap[Device_r.T_sn] = v
- //}
- // 数据 SSELog 实时推送
- go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_)
- // 托管平台 参数
- if len(Device_r.T_ProductJson.T_renew) == 0 {
- return nil // 没有 托管平台
- }
- logs.Println(Msid, "托管平台:", len(Device_r.T_ProductJson.T_renew), Device_r.T_ProductJson.T_renew)
- var articleSlide map[string]interface{}
- err = json.Unmarshal([]byte(message), &articleSlide)
- if err != nil {
- logs.Println(Msid, "参数 托管平台 json 解析错误!", err.Error())
- }
- Device_r.Read_Tidy() // 提前最新的
- for _, field := range Device_r.T_ProductJson.T_renew {
- value := lib.JsonGetField(articleSlide, field)
- if value != nil {
- lib.JsonSetField(Device_r.T_dataJson, field, value)
- }
- }
- logs.Println(Msid, "Device_r.T_data 更新:", Device_r.T_dataJson)
- Device_r.Update("T_data")
- return nil
- }
- // 处理数据
- func AnalysisMap(Msid string, Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
- //JointTab += "."
- //T_r := make(map[string]interface{})
- json_r = make(map[string]interface{})
- for key, value := range ArticleSlide {
- //fmt.Println(reflect.TypeOf(value).String())
- switch reflect.TypeOf(value).String() {
- case "map[string]interface {}":
- json_r[key] = AnalysisMap(Msid, Device_r, value.(map[string]interface{}), JointTab+key+".")
- //return json_r
- break
- case "[]interface {}":
- for _, valuex := range value.([]interface{}) {
- if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
- json_r[key] = AnalysisMap(Msid, Device_r, valuex.(map[string]interface{}), JointTab+key+".")
- }
- }
- //return json_r
- break
- default:
- json_r[key] = value
- // 根 TAB
- if len(JointTab) == 0 {
- //logs.Println("TAB.:",key)
- bson_r := bson.M{key: value}
- // 数据存储
- //T_filter := Device_r.T_ProductJson.T_filter[key]
- if lib.StringExistsInSlice(key, Device_r.T_ProductJson.T_filter) {
- T_filter_v := lib.GetMapRecursion(key, Device_r.T_dataJson)
- //logs.Println("T_filter:",key,value,T_filter_v)
- if value != T_filter_v {
- logs.Println(Msid, "T_filter 不同:", key)
- Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
- }
- } else {
- Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
- }
- // 消息转发
- Relay(Msid, Device_r, JointTab, &bson_r)
- }
- break
- }
- }
- if len(JointTab) == 0 {
- return
- }
- // 多级 TAB
- JointTab = JointTab[:len(JointTab)-1]
- //logs.Println("TAB-:",JointTab)
- bson_r := bson.M{}
- for key, value := range json_r {
- //fmt.Println(key, "->:", value)
- bson_r[key] = value
- }
- // 数据存储
- if lib.StringExistsInSlice(JointTab, Device_r.T_ProductJson.T_filter) {
- T_filter_v := lib.GetMapRecursion(JointTab, Device_r.T_dataJson)
- //logs.Println("T_filter:",JointTab,json_r,T_filter_v)
- if !reflect.DeepEqual(json_r, T_filter_v) {
- //logs.Println("T_filter 不同",JointTab)
- Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
- }
- } else {
- Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
- }
- // 消息转发
- Relay(Msid, Device_r, JointTab, &bson_r)
- return json_r
- }
|