|
@@ -1,35 +1,212 @@
|
|
|
package Handle
|
|
|
|
|
|
import (
|
|
|
+ "Yunlot/conf"
|
|
|
+ "Yunlot/lib"
|
|
|
"Yunlot/logs"
|
|
|
"Yunlot/models/Device"
|
|
|
"Yunlot/models/Product"
|
|
|
+ "encoding/json"
|
|
|
"fmt"
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
+ "plugin"
|
|
|
"reflect"
|
|
|
)
|
|
|
|
|
|
-func AnalysisMap(Device_r *Device.Device, ProductType_r Product.ProductType, ArticleSlide map[string]interface{}, JointTab string) {
|
|
|
+// 设备->平台
|
|
|
+func PullHandle(Device_r *Device.Device, topicName string, message []byte) lib.JSONR {
|
|
|
+ var Rt_r = lib.JSONR{Code: 200, Msg: "ok"}
|
|
|
+
|
|
|
+ // 设备协议
|
|
|
+ ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
|
|
|
+ if !ProductProt_r.Read() {
|
|
|
+ logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
|
|
|
+ Rt_r.Code = 203
|
|
|
+ Rt_r.Msg = "T_prot E!"
|
|
|
+ return Rt_r
|
|
|
+ }
|
|
|
+
|
|
|
+ messagejson := string(message)
|
|
|
+ // 是否加载转换协议
|
|
|
+ if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
|
|
|
+ // 加载 SO 文件
|
|
|
+ p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
|
|
|
+ if err != nil {
|
|
|
+ logs.PrintlnError("打开 SO 失败:", err)
|
|
|
+ Rt_r.Code = 203
|
|
|
+ Rt_r.Msg = "T_analysis E!"
|
|
|
+ return Rt_r
|
|
|
+ }
|
|
|
+ logs.Println("Plugin 地址:", &p)
|
|
|
+
|
|
|
+ // 查找库导出信息
|
|
|
+ s, err := p.Lookup("T")
|
|
|
+ if err != nil {
|
|
|
+ logs.Println("", err)
|
|
|
+ panic(any(err))
|
|
|
+ }
|
|
|
+ // 类型转换
|
|
|
+ messagejson = s.(func(t string, b []byte) string)(topicName, message)
|
|
|
+ // 开始处理
|
|
|
+ logs.Println("协议后:", messagejson)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ //logs.Println("首字符:", string(f[0]))
|
|
|
+ if messagejson[0] == '{' {
|
|
|
+ //结构体
|
|
|
+ var json_r map[string]interface{}
|
|
|
+ err := json.Unmarshal([]byte(messagejson), &json_r)
|
|
|
+ if err != nil {
|
|
|
+ Rt_r.Code = 203
|
|
|
+ Rt_r.Msg = "json E!"
|
|
|
+ return Rt_r
|
|
|
+ }
|
|
|
+
|
|
|
+ json_r = AnalysisMap(Device_r, json_r, "")
|
|
|
+ logs.Println("json_r:", json_r)
|
|
|
+
|
|
|
+ // 合并json
|
|
|
+ Device_r.Read_Tidy() // 提前最新的
|
|
|
+ logs.Println("Device_r.T_data:", Device_r.T_data)
|
|
|
+ var json_x map[string]interface{}
|
|
|
+ if len(Device_r.T_data) > 5 {
|
|
|
+ json.Unmarshal([]byte(Device_r.T_data), &json_x)
|
|
|
+ json_x_b, _ := json.Marshal(json_r)
|
|
|
+ json.Unmarshal(json_x_b, &json_x)
|
|
|
+ json_r = json_x
|
|
|
+ }
|
|
|
+
|
|
|
+ logs.Println("Device_r.T_data 更新:", json_r)
|
|
|
+ Device_r.T_dataJson = json_r
|
|
|
+ Device_r.UpdateTime.NowDbTime()
|
|
|
+ Device_r.Update("T_data", "UpdateTime")
|
|
|
+
|
|
|
+ }
|
|
|
+ //
|
|
|
+ //else {
|
|
|
+ // //列表
|
|
|
+ // var json_lr []map[string]interface{}
|
|
|
+ // err := json.Unmarshal([]byte(messagejson), &json_lr)
|
|
|
+ // if err != nil {
|
|
|
+ // Rt_r.Code = 204
|
|
|
+ // Rt_r.Msg = "[]json E!"
|
|
|
+ // return Rt_r
|
|
|
+ // }
|
|
|
+ // for _, value := range json_lr {
|
|
|
+ // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "")
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // // 合并json
|
|
|
+ // Device_r.Read_Tidy() // 提前最新的
|
|
|
+ // logs.Println("Device_r.T_data:", Device_r.T_data)
|
|
|
+ // var json_x map[string]interface{}
|
|
|
+ // if len(Device_r.T_data) > 5 {
|
|
|
+ // json.Unmarshal([]byte(Device_r.T_data), &json_x)
|
|
|
+ // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1])
|
|
|
+ // json.Unmarshal(json_x_b, &json_x)
|
|
|
+ // json_lr[len(json_lr)-1] = json_x
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // Device_r.T_dataJson = json_lr[len(json_lr)-1]
|
|
|
+ // Device_r.UpdateTime.NowDbTime()
|
|
|
+ // Device_r.Update("T_data", "UpdateTime")
|
|
|
+ //}
|
|
|
+
|
|
|
+ return Rt_r
|
|
|
+}
|
|
|
+
|
|
|
+// 平台->设备
|
|
|
+func PushHandle(Device_r *Device.Device, topicName string, message string) (string, []byte) {
|
|
|
+
|
|
|
+ // 设备协议
|
|
|
+ ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
|
|
|
+ if !ProductProt_r.Read() {
|
|
|
+ logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
|
|
|
+ return "", []byte{}
|
|
|
+ }
|
|
|
+
|
|
|
+ var byte_r []byte
|
|
|
+ var topicName_r = topicName
|
|
|
+ // 是否加载转换协议
|
|
|
+ if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
|
|
|
+ // 根据库的存放路径加载库
|
|
|
+ p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
|
|
|
+ if err != nil {
|
|
|
+ logs.PrintlnError("PushHandle:", err)
|
|
|
+ return "", []byte{}
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查找库导出信息
|
|
|
+ s, err := p.Lookup("R")
|
|
|
+ if err != nil {
|
|
|
+ logs.PrintlnError("PushHandle:", err)
|
|
|
+ return "", []byte{}
|
|
|
+ }
|
|
|
+ // 类型转换
|
|
|
+ topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ // 无效消息,不用推送
|
|
|
+ if len(topicName_r) == 0 || len(byte_r) == 0 {
|
|
|
+ return "", []byte{}
|
|
|
+ }
|
|
|
+
|
|
|
+ // 长连接 网关
|
|
|
+ switch ProductProt_r.T_mode {
|
|
|
+ case 1: //mqtt
|
|
|
+ // 如果 订阅地址与发布相同,在后面强行加 _reply,避免发布后无法收到消息
|
|
|
+ if topicName_r == topicName {
|
|
|
+ topicName_r += "_reply"
|
|
|
+ }
|
|
|
+ lib.Mqtt_publish(topicName_r, byte_r) // 返回数据
|
|
|
+ break
|
|
|
+ //case 2: //tcp
|
|
|
+ //
|
|
|
+ // break
|
|
|
+ //case 3: //CoAP
|
|
|
+ //
|
|
|
+ // break
|
|
|
+ //case 4: //websocket
|
|
|
+ //
|
|
|
+ // break
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ return topicName_r, byte_r
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// 处理数据
|
|
|
+func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
|
|
|
//JointTab += "."
|
|
|
T_r := make(map[string]interface{})
|
|
|
+ json_r = make(map[string]interface{})
|
|
|
for key, value := range ArticleSlide {
|
|
|
//fmt.Println(reflect.TypeOf(value).String())
|
|
|
switch reflect.TypeOf(value).String() {
|
|
|
case "map[string]interface {}":
|
|
|
- AnalysisMap(Device_r, ProductType_r, value.(map[string]interface{}), JointTab+key+".")
|
|
|
+ json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".")
|
|
|
+ return json_r
|
|
|
break
|
|
|
case "[]interface {}":
|
|
|
for _, valuex := range value.([]interface{}) {
|
|
|
if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
|
|
|
- AnalysisMap(Device_r, ProductType_r, valuex.(map[string]interface{}), JointTab+key+".")
|
|
|
+ json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".")
|
|
|
}
|
|
|
}
|
|
|
+ return json_r
|
|
|
break
|
|
|
default:
|
|
|
T_r[key] = value
|
|
|
// 根 TAB
|
|
|
if len(JointTab) == 0 {
|
|
|
- Device.Data_Add(Device_r.T_sn+"_"+key, &bson.M{key: value})
|
|
|
+ //logs.Println("TAB.:",key)
|
|
|
+ bson_r := bson.M{key: value}
|
|
|
+ go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
|
|
|
+ // 消息转发
|
|
|
+ go Relay(Device_r, JointTab, &bson_r)
|
|
|
}
|
|
|
|
|
|
break
|
|
@@ -40,73 +217,18 @@ func AnalysisMap(Device_r *Device.Device, ProductType_r Product.ProductType, Art
|
|
|
}
|
|
|
// 多级 TAB
|
|
|
JointTab = JointTab[:len(JointTab)-1]
|
|
|
- fmt.Println(JointTab, "-------")
|
|
|
+ //logs.Println("TAB-:",JointTab)
|
|
|
bson_r := bson.M{}
|
|
|
for key, value := range T_r {
|
|
|
fmt.Println(key, "->:", value)
|
|
|
bson_r[key] = value
|
|
|
+ json_r[key] = value
|
|
|
}
|
|
|
|
|
|
- // 消息转发
|
|
|
- //Relay(Device_r, ProductType_r, JointTab, ArticleSlide)
|
|
|
-
|
|
|
// 数据存储
|
|
|
- Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-// 设备->平台
|
|
|
-func T(Device_r Device.Device, topic string, message []byte) {
|
|
|
- // 设备类型
|
|
|
- ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID}
|
|
|
- if !ProductType_r.Read() {
|
|
|
- logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!")
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // 设备协议
|
|
|
- ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot}
|
|
|
- if !ProductProt_r.Read() {
|
|
|
- logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!")
|
|
|
- return
|
|
|
- }
|
|
|
- //
|
|
|
- //// 根据库的存放路径加载库
|
|
|
- //p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis)
|
|
|
- //if err != nil {
|
|
|
- // println(err)
|
|
|
- // panic(any(err))
|
|
|
- //}
|
|
|
- //
|
|
|
- //// 查找库导出信息
|
|
|
- //s, err := p.Lookup("T")
|
|
|
- //if err != nil {
|
|
|
- // println(err)
|
|
|
- // panic(any(err))
|
|
|
- //}
|
|
|
- //// 类型转换
|
|
|
- //f := s.(func(b []byte) []byte)(message)
|
|
|
-
|
|
|
- ////0:Mqtt 1:http 2:tcp 3:CoAP 4:websocket
|
|
|
- //switch ProductProt_r.T_mode {
|
|
|
- //case 0: //Mqtt
|
|
|
- // MqttServer.Mqtt_publish(topic, f) // 返回数据
|
|
|
- // break
|
|
|
- //case 1: //http
|
|
|
- //
|
|
|
- // break
|
|
|
- //case 2: //tcp
|
|
|
- //
|
|
|
- // break
|
|
|
- //case 3: //CoAP
|
|
|
- //
|
|
|
- // break
|
|
|
- //case 4: //websocket
|
|
|
- //
|
|
|
- // break
|
|
|
- //
|
|
|
- //}
|
|
|
-
|
|
|
- return
|
|
|
+ go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
|
|
|
|
|
|
+ // 消息转发
|
|
|
+ go Relay(Device_r, JointTab, &bson_r)
|
|
|
+ return json_r
|
|
|
}
|