Handle.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package Handle
  2. import (
  3. "Yunlot/conf"
  4. "Yunlot/logs"
  5. "Yunlot/models/Device"
  6. "Yunlot/models/Product"
  7. "encoding/hex"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "plugin"
  13. "reflect"
  14. "time"
  15. )
  16. // 设备->平台
  17. func PullHandle(Device_r *Device.Device, topicName string, message []byte) error {
  18. DeviceRealLogR_ := []string{}
  19. // 设备协议
  20. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  21. if !ProductProt_r.Read() {
  22. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  23. return errors.New("T_prot E!")
  24. }
  25. // 加入设备日志 类型
  26. if message[0] == '{' {
  27. DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(message)) // 加入设备日志
  28. } else {
  29. DeviceRealLogR_ = append(DeviceRealLogR_, "<-接收["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(message)) // 加入设备日志
  30. }
  31. messagejson := string(message)
  32. // 是否加载转换协议
  33. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  34. // 加载 SO 文件
  35. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  36. if err != nil {
  37. logs.PrintlnError("打开 SO 失败:", err)
  38. return errors.New("T_analysis E!")
  39. }
  40. logs.Println("Plugin 地址:", &p)
  41. // 查找库导出信息
  42. s, err := p.Lookup("T")
  43. if err != nil {
  44. logs.PrintlnError("Plugin:", err)
  45. return errors.New("Plugin E!")
  46. }
  47. // 类型转换
  48. messagejson = s.(func(t string, b []byte) string)(topicName, message)
  49. // 开始处理
  50. logs.Println("协议后:", messagejson)
  51. DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志
  52. }
  53. //logs.Println("首字符:", string(f[0]))
  54. if messagejson[0] == '{' {
  55. //结构体
  56. var json_r map[string]interface{}
  57. err := json.Unmarshal([]byte(messagejson), &json_r)
  58. if err != nil {
  59. return errors.New("json E!")
  60. }
  61. json_r = AnalysisMap(Device_r, json_r, "")
  62. logs.Println("json_r:", json_r)
  63. // 合并json
  64. Device_r.Read_Tidy() // 提前最新的
  65. logs.Println("Device_r.T_data:", Device_r.T_data)
  66. var json_x map[string]interface{}
  67. if len(Device_r.T_data) > 5 {
  68. json.Unmarshal([]byte(Device_r.T_data), &json_x)
  69. json_x_b, _ := json.Marshal(json_r)
  70. json.Unmarshal(json_x_b, &json_x)
  71. json_r = json_x
  72. }
  73. logs.Println("Device_r.T_data 更新:", json_r)
  74. Device_r.T_dataJson = json_r
  75. Device_r.UpdateTime.NowDbTime()
  76. Device_r.Update("T_data", "UpdateTime")
  77. }
  78. //
  79. //else {
  80. // //列表
  81. // var json_lr []map[string]interface{}
  82. // err := json.Unmarshal([]byte(messagejson), &json_lr)
  83. // if err != nil {
  84. // Rt_r.Code = 204
  85. // Rt_r.Msg = "[]json E!"
  86. // return Rt_r
  87. // }
  88. // for _, value := range json_lr {
  89. // json_lr[len(json_lr)-1] = AnalysisMap(Device_r, value, "")
  90. // }
  91. //
  92. // // 合并json
  93. // Device_r.Read_Tidy() // 提前最新的
  94. // logs.Println("Device_r.T_data:", Device_r.T_data)
  95. // var json_x map[string]interface{}
  96. // if len(Device_r.T_data) > 5 {
  97. // json.Unmarshal([]byte(Device_r.T_data), &json_x)
  98. // json_x_b, _ := json.Marshal(json_lr[len(json_lr)-1])
  99. // json.Unmarshal(json_x_b, &json_x)
  100. // json_lr[len(json_lr)-1] = json_x
  101. // }
  102. //
  103. // Device_r.T_dataJson = json_lr[len(json_lr)-1]
  104. // Device_r.UpdateTime.NowDbTime()
  105. // Device_r.Update("T_data", "UpdateTime")
  106. //}
  107. //logs.Println("DeviceRealLogR_:",DeviceRealLogR_)
  108. // 更新设备记录日志
  109. v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  110. if is {
  111. v.Data = append(v.Data, DeviceRealLogR_...)
  112. logs.DeviceRealLogMap[Device_r.T_sn] = v
  113. }
  114. return nil
  115. }
  116. // 平台->设备
  117. func PushHandle(Device_r *Device.Device, topicName string, message string) error {
  118. DeviceRealLogR_ := []string{}
  119. // 设备协议
  120. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  121. if !ProductProt_r.Read() {
  122. logs.Println("PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  123. return errors.New("设备协议找不到")
  124. }
  125. var byte_r []byte
  126. var topicName_r = topicName
  127. // 是否加载转换协议
  128. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  129. // 根据库的存放路径加载库
  130. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  131. if err != nil {
  132. logs.PrintlnError("PushHandle:", err)
  133. return err
  134. }
  135. // 查找库导出信息
  136. s, err := p.Lookup("R")
  137. if err != nil {
  138. logs.PrintlnError("PushHandle:", err)
  139. return err
  140. }
  141. DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志
  142. // 类型转换
  143. topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
  144. // 开始处理
  145. logs.Println("协议后:", byte_r)
  146. }
  147. // 无效消息,不用推送
  148. if len(topicName_r) == 0 || len(byte_r) == 0 {
  149. logs.Println("无效消息,不用推送!", len(topicName_r), len(byte_r))
  150. return errors.New("无效消息,不用推送")
  151. }
  152. // 长连接 网关
  153. logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode)
  154. ProductMode_r, is := Product.ProductModeMap[ProductProt_r.T_mode]
  155. if !is {
  156. logs.Println("没有找到网关!", ProductProt_r.T_mode)
  157. return errors.New("没有找到网关!")
  158. }
  159. //初始化插件
  160. FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle")
  161. if err != nil {
  162. logs.Println("网关 错误!", ProductProt_r.T_mode, err)
  163. return errors.New("网关 错误!")
  164. }
  165. // 初始化插件
  166. err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r)
  167. if err != nil {
  168. logs.Println("网关 推送失败!", ProductProt_r.T_mode, err)
  169. return errors.New("网关 推送失败!")
  170. }
  171. if byte_r[0] == '{' {
  172. DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+string(byte_r)) // 加入设备日志
  173. } else {
  174. DeviceRealLogR_ = append(DeviceRealLogR_, "->推送["+topicName_r+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+hex.EncodeToString(byte_r)) // 加入设备日志
  175. }
  176. // 更新设备记录日志
  177. v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  178. if is {
  179. v.Data = append(v.Data, DeviceRealLogR_...)
  180. logs.DeviceRealLogMap[Device_r.T_sn] = v
  181. }
  182. return nil
  183. }
  184. // 处理数据
  185. func AnalysisMap(Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
  186. //JointTab += "."
  187. T_r := make(map[string]interface{})
  188. json_r = make(map[string]interface{})
  189. for key, value := range ArticleSlide {
  190. //fmt.Println(reflect.TypeOf(value).String())
  191. switch reflect.TypeOf(value).String() {
  192. case "map[string]interface {}":
  193. json_r[key] = AnalysisMap(Device_r, value.(map[string]interface{}), JointTab+key+".")
  194. return json_r
  195. break
  196. case "[]interface {}":
  197. for _, valuex := range value.([]interface{}) {
  198. if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
  199. json_r[key] = AnalysisMap(Device_r, valuex.(map[string]interface{}), JointTab+key+".")
  200. }
  201. }
  202. return json_r
  203. break
  204. default:
  205. T_r[key] = value
  206. // 根 TAB
  207. if len(JointTab) == 0 {
  208. //logs.Println("TAB.:",key)
  209. bson_r := bson.M{key: value}
  210. go Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
  211. // 消息转发
  212. go Relay(Device_r, JointTab, &bson_r)
  213. }
  214. break
  215. }
  216. }
  217. if len(JointTab) == 0 {
  218. return
  219. }
  220. // 多级 TAB
  221. JointTab = JointTab[:len(JointTab)-1]
  222. //logs.Println("TAB-:",JointTab)
  223. bson_r := bson.M{}
  224. for key, value := range T_r {
  225. fmt.Println(key, "->:", value)
  226. bson_r[key] = value
  227. json_r[key] = value
  228. }
  229. // 数据存储
  230. go Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
  231. // 消息转发
  232. go Relay(Device_r, JointTab, &bson_r)
  233. return json_r
  234. }