123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- package Handle
- import (
- "Yunlot/conf"
- "Yunlot/logs"
- "Yunlot/models/Device"
- "Yunlot/models/Product"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "plugin"
- "reflect"
- "time"
- )
- // 设备->平台
- func PullHandle(Device_r *Device.Device, topicName string, message []byte) error {
- DeviceRealLogR_ := []string{}
- // 设备协议
- ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
- if !ProductProt_r.Read() {
- logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
- return errors.New("T_prot E!")
- }
- // 加入设备日志 类型
- if message[0] == '{' {
- DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(message)) // 加入设备日志
- } else {
- DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(message)) // 加入设备日志
- }
- messagejson := string(message)
- // 是否加载转换协议
- if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
- // 加载 SO 文件
- p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- if err != nil {
- logs.PrintlnError("打开 SO 失败:", err)
- return errors.New("T_analysis E!")
- }
- logs.Println("Plugin 地址:", &p)
- // 查找库导出信息
- s, err := p.Lookup("T")
- if err != nil {
- logs.PrintlnError("Plugin:", err)
- return errors.New("Plugin E!")
- }
- // 类型转换
- messagejson = s.(func(t string, b []byte) string)(topicName, message)
- // 开始处理
- logs.Println("协议后:", messagejson)
- DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志
- }
- //logs.Println("首字符:", string(f[0]))
- if messagejson[0] == '{' {
- //结构体
- var json_r map[string]interface{}
- err := json.Unmarshal([]byte(messagejson), &json_r)
- if err != nil {
- return errors.New("json E!")
- }
- json_r = AnalysisMap(Device_r, json_r, "")
- logs.Println("json_r:", json_r)
- // 合并json
- Device_r.Read_Tidy() // 提前最新的
- logs.Println("Device_r.T_data:", Device_r.T_data)
- var json_x map[string]interface{}
- if len(Device_r.T_data) > 5 {
- json.Unmarshal([]byte(Device_r.T_data), &json_x)
- json_x_b, _ := json.Marshal(json_r)
- json.Unmarshal(json_x_b, &json_x)
- json_r = json_x
- }
- logs.Println("Device_r.T_data 更新:", json_r)
- Device_r.T_dataJson = json_r
- Device_r.UpdateTime.NowDbTime()
- Device_r.Update("T_data", "UpdateTime")
- }
- //
- //else {
- // //列表
- // var json_lr []map[string]interface{}
- // err := json.Unmarshal([]byte(messagejson), &json_lr)
- // if err != nil {
- // Rt_r.Code = 204
- // Rt_r.Msg = "[]json E!"
- // return Rt_r
- // }
- // for _, value := range json_lr {
- // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "")
- // }
- //
- // // 合并json
- // Device_r.Read_Tidy() // 提前最新的
- // logs.Println("Device_r.T_data:", Device_r.T_data)
- // var json_x map[string]interface{}
- // if len(Device_r.T_data) > 5 {
- // json.Unmarshal([]byte(Device_r.T_data), &json_x)
- // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1])
- // json.Unmarshal(json_x_b, &json_x)
- // json_lr[len(json_lr)-1] = json_x
- // }
- //
- // Device_r.T_dataJson = json_lr[len(json_lr)-1]
- // Device_r.UpdateTime.NowDbTime()
- // Device_r.Update("T_data", "UpdateTime")
- //}
- //logs.Println("DeviceRealLogR_:",DeviceRealLogR_)
- // 更新设备记录日志
- v, is := logs.DeviceRealLogMap[Device_r.T_sn]
- if is {
- v.Data = append(v.Data, DeviceRealLogR_...)
- logs.DeviceRealLogMap[Device_r.T_sn] = v
- }
- return nil
- }
- // 平台->设备
- func PushHandle(Device_r *Device.Device, topicName string, message string) error {
- DeviceRealLogR_ := []string{}
- // 设备协议
- ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
- if !ProductProt_r.Read() {
- logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
- return errors.New("设备协议找不到")
- }
- var byte_r []byte
- var topicName_r = topicName
- // 是否加载转换协议
- if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
- // 根据库的存放路径加载库
- p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- if err != nil {
- logs.PrintlnError("PushHandle:", err)
- return err
- }
- // 查找库导出信息
- s, err := p.Lookup("R")
- if err != nil {
- logs.PrintlnError("PushHandle:", err)
- return err
- }
- DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志
- // 类型转换
- topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
- // 开始处理
- logs.Println("协议后:", byte_r)
- }
- // 无效消息,不用推送
- if len(topicName_r) == 0 || len(byte_r) == 0 {
- logs.Println("无效消息,不用推送!", len(topicName_r), len(byte_r))
- return errors.New("无效消息,不用推送")
- }
- // 长连接 网关
- logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode)
- ProductMode_r, is := Product.ProductModeMap[ProductProt_r.T_mode]
- if !is {
- logs.Println("没有找到网关!", ProductProt_r.T_mode)
- return errors.New("没有找到网关!")
- }
- //初始化插件
- FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle")
- if err != nil {
- logs.Println("网关 错误!", ProductProt_r.T_mode, err)
- return errors.New("网关 错误!")
- }
- // 初始化插件
- err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r)
- if err != nil {
- logs.Println("网关 推送失败!", ProductProt_r.T_mode, err)
- return errors.New("网关 推送失败!")
- }
- if byte_r[0] == '{' {
- DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(byte_r)) // 加入设备日志
- } else {
- DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(byte_r)) // 加入设备日志
- }
- // 更新设备记录日志
- v, is := logs.DeviceRealLogMap[Device_r.T_sn]
- if is {
- v.Data = append(v.Data, DeviceRealLogR_...)
- logs.DeviceRealLogMap[Device_r.T_sn] = v
- }
- return nil
- }
- // 处理数据
- func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
- //JointTab += "."
- T_r := make(map[string]interface{})
- json_r = make(map[string]interface{})
- for key, value := range ArticleSlide {
- //fmt.Println(reflect.TypeOf(value).String())
- switch reflect.TypeOf(value).String() {
- case "map[string]interface {}":
- json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".")
- return json_r
- break
- case "[]interface {}":
- for _, valuex := range value.([]interface{}) {
- if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
- json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".")
- }
- }
- return json_r
- break
- default:
- T_r[key] = value
- // 根 TAB
- if len(JointTab) == 0 {
- //logs.Println("TAB.:",key)
- bson_r := bson.M{key: value}
- go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
- // 消息转发
- go Relay(Device_r, JointTab, &bson_r)
- }
- break
- }
- }
- if len(JointTab) == 0 {
- return
- }
- // 多级 TAB
- JointTab = JointTab[:len(JointTab)-1]
- //logs.Println("TAB-:",JointTab)
- bson_r := bson.M{}
- for key, value := range T_r {
- fmt.Println(key, "->:", value)
- bson_r[key] = value
- json_r[key] = value
- }
- // 数据存储
- go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
- // 消息转发
- go Relay(Device_r, JointTab, &bson_r)
- return json_r
- }
|