Handle.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package Handle
  2. import (
  3. "Yunlot/conf"
  4. "Yunlot/lib"
  5. "Yunlot/logs"
  6. "Yunlot/models/Device"
  7. "Yunlot/models/Product"
  8. "encoding/json"
  9. "fmt"
  10. "go.mongodb.org/mongo-driver/bson"
  11. "plugin"
  12. "reflect"
  13. )
  14. // 设备->平台
  15. func PullHandle(Device_r *Device.Device, topicName string, message []byte) lib.JSONR {
  16. var Rt_r = lib.JSONR{Code: 200, Msg: "ok"}
  17. // 设备协议
  18. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  19. if !ProductProt_r.Read() {
  20. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  21. Rt_r.Code = 203
  22. Rt_r.Msg = "T_prot E!"
  23. return Rt_r
  24. }
  25. messagejson := string(message)
  26. // 是否加载转换协议
  27. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  28. // 加载 SO 文件
  29. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  30. if err != nil {
  31. logs.PrintlnError("打开 SO 失败:", err)
  32. Rt_r.Code = 203
  33. Rt_r.Msg = "T_analysis E!"
  34. return Rt_r
  35. }
  36. logs.Println("Plugin 地址:", &p)
  37. // 查找库导出信息
  38. s, err := p.Lookup("T")
  39. if err != nil {
  40. logs.Println("", err)
  41. panic(any(err))
  42. }
  43. // 类型转换
  44. messagejson = s.(func(t string, b []byte) string)(topicName, message)
  45. // 开始处理
  46. logs.Println("协议后:", messagejson)
  47. }
  48. //logs.Println("首字符:", string(f[0]))
  49. if messagejson[0] == '{' {
  50. //结构体
  51. var json_r map[string]interface{}
  52. err := json.Unmarshal([]byte(messagejson), &json_r)
  53. if err != nil {
  54. Rt_r.Code = 203
  55. Rt_r.Msg = "json E!"
  56. return Rt_r
  57. }
  58. json_r = AnalysisMap(Device_r, json_r, "")
  59. logs.Println("json_r:", json_r)
  60. // 合并json
  61. Device_r.Read_Tidy() // 提前最新的
  62. logs.Println("Device_r.T_data:", Device_r.T_data)
  63. var json_x map[string]interface{}
  64. if len(Device_r.T_data) > 5 {
  65. json.Unmarshal([]byte(Device_r.T_data), &json_x)
  66. json_x_b, _ := json.Marshal(json_r)
  67. json.Unmarshal(json_x_b, &json_x)
  68. json_r = json_x
  69. }
  70. logs.Println("Device_r.T_data 更新:", json_r)
  71. Device_r.T_dataJson = json_r
  72. Device_r.UpdateTime.NowDbTime()
  73. Device_r.Update("T_data", "UpdateTime")
  74. }
  75. //
  76. //else {
  77. // //列表
  78. // var json_lr []map[string]interface{}
  79. // err := json.Unmarshal([]byte(messagejson), &json_lr)
  80. // if err != nil {
  81. // Rt_r.Code = 204
  82. // Rt_r.Msg = "[]json E!"
  83. // return Rt_r
  84. // }
  85. // for _, value := range json_lr {
  86. // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "")
  87. // }
  88. //
  89. // // 合并json
  90. // Device_r.Read_Tidy() // 提前最新的
  91. // logs.Println("Device_r.T_data:", Device_r.T_data)
  92. // var json_x map[string]interface{}
  93. // if len(Device_r.T_data) > 5 {
  94. // json.Unmarshal([]byte(Device_r.T_data), &json_x)
  95. // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1])
  96. // json.Unmarshal(json_x_b, &json_x)
  97. // json_lr[len(json_lr)-1] = json_x
  98. // }
  99. //
  100. // Device_r.T_dataJson = json_lr[len(json_lr)-1]
  101. // Device_r.UpdateTime.NowDbTime()
  102. // Device_r.Update("T_data", "UpdateTime")
  103. //}
  104. return Rt_r
  105. }
  106. // 平台->设备
  107. func PushHandle(Device_r *Device.Device, topicName string, message string) (string, []byte) {
  108. // 设备协议
  109. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  110. if !ProductProt_r.Read() {
  111. logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  112. return "", []byte{}
  113. }
  114. var byte_r []byte
  115. var topicName_r = topicName
  116. // 是否加载转换协议
  117. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  118. // 根据库的存放路径加载库
  119. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  120. if err != nil {
  121. logs.PrintlnError("PushHandle:", err)
  122. return "", []byte{}
  123. }
  124. // 查找库导出信息
  125. s, err := p.Lookup("R")
  126. if err != nil {
  127. logs.PrintlnError("PushHandle:", err)
  128. return "", []byte{}
  129. }
  130. // 类型转换
  131. topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
  132. }
  133. // 无效消息,不用推送
  134. if len(topicName_r) == 0 || len(byte_r) == 0 {
  135. return "", []byte{}
  136. }
  137. // 长连接 网关
  138. switch ProductProt_r.T_mode {
  139. case 1: //mqtt
  140. // 如果 订阅地址与发布相同,在后面强行加 _reply,避免发布后无法收到消息
  141. if topicName_r == topicName {
  142. topicName_r += "_reply"
  143. }
  144. lib.Mqtt_publish(topicName_r, byte_r) // 返回数据
  145. break
  146. //case 2: //tcp
  147. //
  148. // break
  149. //case 3: //CoAP
  150. //
  151. // break
  152. //case 4: //websocket
  153. //
  154. // break
  155. }
  156. return topicName_r, byte_r
  157. }
  158. // 处理数据
  159. func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
  160. //JointTab += "."
  161. T_r := make(map[string]interface{})
  162. json_r = make(map[string]interface{})
  163. for key, value := range ArticleSlide {
  164. //fmt.Println(reflect.TypeOf(value).String())
  165. switch reflect.TypeOf(value).String() {
  166. case "map[string]interface {}":
  167. json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".")
  168. return json_r
  169. break
  170. case "[]interface {}":
  171. for _, valuex := range value.([]interface{}) {
  172. if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
  173. json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".")
  174. }
  175. }
  176. return json_r
  177. break
  178. default:
  179. T_r[key] = value
  180. // 根 TAB
  181. if len(JointTab) == 0 {
  182. //logs.Println("TAB.:",key)
  183. bson_r := bson.M{key: value}
  184. go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
  185. // 消息转发
  186. go Relay(Device_r, JointTab, &bson_r)
  187. }
  188. break
  189. }
  190. }
  191. if len(JointTab) == 0 {
  192. return
  193. }
  194. // 多级 TAB
  195. JointTab = JointTab[:len(JointTab)-1]
  196. //logs.Println("TAB-:",JointTab)
  197. bson_r := bson.M{}
  198. for key, value := range T_r {
  199. fmt.Println(key, "->:", value)
  200. bson_r[key] = value
  201. json_r[key] = value
  202. }
  203. // 数据存储
  204. go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
  205. // 消息转发
  206. go Relay(Device_r, JointTab, &bson_r)
  207. return json_r
  208. }