Handle.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package Handle
  2. import (
  3. "Yunlot/Handle/MqttServer"
  4. "Yunlot/conf"
  5. "Yunlot/logs"
  6. "Yunlot/models/Device"
  7. "Yunlot/models/Product"
  8. "fmt"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "plugin"
  11. "reflect"
  12. )
  13. func AnalysisMap(Device_r Device.Device, ProductType_r Product.ProductType, ArticleSlide map[string]interface{}, JointTab string) {
  14. //JointTab += "."
  15. T_r := make(map[string]interface{})
  16. for key, value := range ArticleSlide {
  17. //fmt.Println(reflect.TypeOf(value).String())
  18. switch reflect.TypeOf(value).String() {
  19. case "map[string]interface {}":
  20. AnalysisMap(Device_r, ProductType_r, value.(map[string]interface{}), JointTab+key+"_")
  21. break
  22. case "[]interface {}":
  23. for _, valuex := range value.([]interface{}) {
  24. if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
  25. AnalysisMap(Device_r, ProductType_r, valuex.(map[string]interface{}), JointTab+key+"_")
  26. }
  27. }
  28. break
  29. default:
  30. T_r[key] = value
  31. if len(JointTab) == 0 {
  32. Device.Data_Add("202300000001_"+key, &bson.M{key: value})
  33. }
  34. break
  35. }
  36. }
  37. if len(JointTab) == 0 {
  38. return
  39. }
  40. JointTab = JointTab[:len(JointTab)-1]
  41. fmt.Println(JointTab, "-------")
  42. bson_r := bson.M{}
  43. for key, value := range T_r {
  44. fmt.Println(key, "->:", value)
  45. bson_r[key] = value
  46. }
  47. // 消息转发
  48. Relay(Device_r, ProductType_r, JointTab, ArticleSlide)
  49. // 数据存储
  50. Device.Data_Add("202300000001_"+JointTab, &bson_r)
  51. }
  52. // 设备->平台
  53. func T(Device_r Device.Device, topic string, message []byte) {
  54. // 设备类型
  55. ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID}
  56. if !ProductType_r.Read() {
  57. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!")
  58. return
  59. }
  60. // 设备协议
  61. ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot}
  62. if !ProductProt_r.Read() {
  63. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+ fmt.Sprintf("%d",ProductType_r.T_prot)+" 设备协议找不到!")
  64. return
  65. }
  66. // 根据库的存放路径加载库
  67. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis)
  68. if err != nil {
  69. println(err)
  70. panic(any(err))
  71. }
  72. // 查找库导出信息
  73. s, err := p.Lookup("T")
  74. if err != nil {
  75. println(err)
  76. panic(any(err))
  77. }
  78. // 类型转换
  79. f := s.(func(b []byte) []byte)(message)
  80. //0:Mqtt 1:http 2:tcp 3:CoAP 4:websocket
  81. switch ProductProt_r.T_mode {
  82. case 0: //Mqtt
  83. MqttServer.Mqtt_publish(topic, f) // 返回数据
  84. break
  85. case 1: //http
  86. break
  87. case 2: //tcp
  88. break
  89. case 3: //CoAP
  90. break
  91. case 4: //websocket
  92. break
  93. }
  94. return
  95. }