Handle.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package Handle
  2. import (
  3. "Yunlot/conf"
  4. "Yunlot/lib"
  5. "Yunlot/logs"
  6. "Yunlot/models/Device"
  7. "Yunlot/models/Product"
  8. "encoding/hex"
  9. "encoding/json"
  10. "fmt"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "plugin"
  13. "reflect"
  14. "time"
  15. )
  16. // 设备->平台
  17. func PullHandle(Device_r *Device.Device, topicName string, message []byte) lib.JSONR {
  18. DeviceRealLogR_ := []string{}
  19. var Rt_r = lib.JSONR{Code: 200, Msg: "ok"}
  20. // 设备协议
  21. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  22. if !ProductProt_r.Read() {
  23. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  24. Rt_r.Code = 203
  25. Rt_r.Msg = "T_prot E!"
  26. return Rt_r
  27. }
  28. // 加入设备日志 类型
  29. if message[0] == '{' {
  30. DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(message)) // 加入设备日志
  31. } else {
  32. DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(message)) // 加入设备日志
  33. }
  34. messagejson := string(message)
  35. // 是否加载转换协议
  36. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  37. // 加载 SO 文件
  38. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  39. if err != nil {
  40. logs.PrintlnError("打开 SO 失败:", err)
  41. Rt_r.Code = 203
  42. Rt_r.Msg = "T_analysis E!"
  43. return Rt_r
  44. }
  45. logs.Println("Plugin 地址:", &p)
  46. // 查找库导出信息
  47. s, err := p.Lookup("T")
  48. if err != nil {
  49. logs.Println("", err)
  50. panic(any(err))
  51. }
  52. // 类型转换
  53. messagejson = s.(func(t string, b []byte) string)(topicName, message)
  54. // 开始处理
  55. logs.Println("协议后:", messagejson)
  56. DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志
  57. }
  58. //logs.Println("首字符:", string(f[0]))
  59. if messagejson[0] == '{' {
  60. //结构体
  61. var json_r map[string]interface{}
  62. err := json.Unmarshal([]byte(messagejson), &json_r)
  63. if err != nil {
  64. Rt_r.Code = 203
  65. Rt_r.Msg = "json E!"
  66. return Rt_r
  67. }
  68. json_r = AnalysisMap(Device_r, json_r, "")
  69. logs.Println("json_r:", json_r)
  70. // 合并json
  71. Device_r.Read_Tidy() // 提前最新的
  72. logs.Println("Device_r.T_data:", Device_r.T_data)
  73. var json_x map[string]interface{}
  74. if len(Device_r.T_data) > 5 {
  75. json.Unmarshal([]byte(Device_r.T_data), &json_x)
  76. json_x_b, _ := json.Marshal(json_r)
  77. json.Unmarshal(json_x_b, &json_x)
  78. json_r = json_x
  79. }
  80. logs.Println("Device_r.T_data 更新:", json_r)
  81. Device_r.T_dataJson = json_r
  82. Device_r.UpdateTime.NowDbTime()
  83. Device_r.Update("T_data", "UpdateTime")
  84. }
  85. //
  86. //else {
  87. // //列表
  88. // var json_lr []map[string]interface{}
  89. // err := json.Unmarshal([]byte(messagejson), &json_lr)
  90. // if err != nil {
  91. // Rt_r.Code = 204
  92. // Rt_r.Msg = "[]json E!"
  93. // return Rt_r
  94. // }
  95. // for _, value := range json_lr {
  96. // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "")
  97. // }
  98. //
  99. // // 合并json
  100. // Device_r.Read_Tidy() // 提前最新的
  101. // logs.Println("Device_r.T_data:", Device_r.T_data)
  102. // var json_x map[string]interface{}
  103. // if len(Device_r.T_data) > 5 {
  104. // json.Unmarshal([]byte(Device_r.T_data), &json_x)
  105. // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1])
  106. // json.Unmarshal(json_x_b, &json_x)
  107. // json_lr[len(json_lr)-1] = json_x
  108. // }
  109. //
  110. // Device_r.T_dataJson = json_lr[len(json_lr)-1]
  111. // Device_r.UpdateTime.NowDbTime()
  112. // Device_r.Update("T_data", "UpdateTime")
  113. //}
  114. //logs.Println("DeviceRealLogR_:",DeviceRealLogR_)
  115. // 更新设备记录日志
  116. v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  117. if is {
  118. v.Data = append(v.Data, DeviceRealLogR_...)
  119. logs.DeviceRealLogMap[Device_r.T_sn] = v
  120. }
  121. return Rt_r
  122. }
  123. // 平台->设备
  124. func PushHandle(Device_r *Device.Device, topicName string, message string) (string, []byte) {
  125. DeviceRealLogR_ := []string{}
  126. // 设备协议
  127. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  128. if !ProductProt_r.Read() {
  129. logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  130. return "", []byte{}
  131. }
  132. var byte_r []byte
  133. var topicName_r = topicName
  134. // 是否加载转换协议
  135. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  136. // 根据库的存放路径加载库
  137. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  138. if err != nil {
  139. logs.PrintlnError("PushHandle:", err)
  140. return "", []byte{}
  141. }
  142. // 查找库导出信息
  143. s, err := p.Lookup("R")
  144. if err != nil {
  145. logs.PrintlnError("PushHandle:", err)
  146. return "", []byte{}
  147. }
  148. DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志
  149. // 类型转换
  150. topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
  151. // 开始处理
  152. logs.Println("协议后:", byte_r)
  153. }
  154. // 无效消息,不用推送
  155. if len(topicName_r) == 0 || len(byte_r) == 0 {
  156. logs.Println("无效消息,不用推送!", len(topicName_r), len(byte_r))
  157. return "", []byte{}
  158. }
  159. // 长连接 网关
  160. logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode)
  161. switch ProductProt_r.T_mode {
  162. case 1: //mqtt
  163. // 如果 订阅地址与发布相同,在后面强行加 _reply,避免发布后无法收到消息
  164. if topicName_r == topicName {
  165. topicName_r += "_reply"
  166. }
  167. lib.Mqtt_publish(topicName_r, byte_r) // 返回数据
  168. break
  169. case 2: //tcp
  170. lib.TCP_publish(Device_r.T_sn, byte_r)
  171. break
  172. }
  173. if byte_r[0] == '{' {
  174. DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(byte_r)) // 加入设备日志
  175. } else {
  176. DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(byte_r)) // 加入设备日志
  177. }
  178. // 更新设备记录日志
  179. v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  180. if is {
  181. v.Data = append(v.Data, DeviceRealLogR_...)
  182. logs.DeviceRealLogMap[Device_r.T_sn] = v
  183. }
  184. return topicName_r, byte_r
  185. }
  186. // 处理数据
  187. func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
  188. //JointTab += "."
  189. T_r := make(map[string]interface{})
  190. json_r = make(map[string]interface{})
  191. for key, value := range ArticleSlide {
  192. //fmt.Println(reflect.TypeOf(value).String())
  193. switch reflect.TypeOf(value).String() {
  194. case "map[string]interface {}":
  195. json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".")
  196. return json_r
  197. break
  198. case "[]interface {}":
  199. for _, valuex := range value.([]interface{}) {
  200. if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
  201. json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".")
  202. }
  203. }
  204. return json_r
  205. break
  206. default:
  207. T_r[key] = value
  208. // 根 TAB
  209. if len(JointTab) == 0 {
  210. //logs.Println("TAB.:",key)
  211. bson_r := bson.M{key: value}
  212. go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
  213. // 消息转发
  214. go Relay(Device_r, JointTab, &bson_r)
  215. }
  216. break
  217. }
  218. }
  219. if len(JointTab) == 0 {
  220. return
  221. }
  222. // 多级 TAB
  223. JointTab = JointTab[:len(JointTab)-1]
  224. //logs.Println("TAB-:",JointTab)
  225. bson_r := bson.M{}
  226. for key, value := range T_r {
  227. fmt.Println(key, "->:", value)
  228. bson_r[key] = value
  229. json_r[key] = value
  230. }
  231. // 数据存储
  232. go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
  233. // 消息转发
  234. go Relay(Device_r, JointTab, &bson_r)
  235. return json_r
  236. }