Handle.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package Handle
  2. import (
  3. "AIOTCOER/conf"
  4. "AIOTCOER/lib"
  5. "AIOTCOER/logs"
  6. "AIOTCOER/models/Device"
  7. "AIOTCOER/models/Product"
  8. "AIOTCOER/models/StatisticalData"
  9. "encoding/hex"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "go.mongodb.org/mongo-driver/bson"
  14. "plugin"
  15. "reflect"
  16. "sync"
  17. "time"
  18. )
  19. var Mu_PullHandle sync.Mutex // Mutex for synchronizing access to the map
  20. // var Mu_PushHandle sync.Mutex // Mutex for synchronizing access to the map
  21. // 设备->平台
  22. func PullHandle(Device_r *Device.Device, topicName string, message []byte) error {
  23. Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID
  24. //Msidstart := time.Now()
  25. defer lib.PullHandleTime.TimeCost(time.Now())
  26. //流量统计
  27. go func(num int64) {
  28. Mu_PullHandle.Lock()
  29. sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID]
  30. if !sdis {
  31. sd = &StatisticalData.FlowCount_T{}
  32. StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd
  33. }
  34. sd.T_send += num
  35. Mu_PullHandle.Unlock()
  36. }(int64(len(message)))
  37. DeviceRealLogR_ := []string{}
  38. // 加入设备日志 类型
  39. Data_log := ""
  40. if message[0] == '{' {
  41. Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(message)
  42. } else {
  43. Data_log = "<-接收[" + topicName + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(message) // 加入设备日志
  44. }
  45. // 加入设备日志
  46. DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志
  47. logs.PrintlnData(Data_log)
  48. messagejson := string(message)
  49. // 是否加载转换协议
  50. if Device_r.T_ProductJson.T_prot != 0 {
  51. // 设备协议
  52. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  53. if !ProductProt_r.Read() {
  54. logs.Println(Msid, "MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  55. return errors.New("T_prot E!")
  56. }
  57. // 加载 SO 文件
  58. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  59. if err != nil {
  60. logs.PrintlnError(Msid, "打开 SO 失败:", err)
  61. return errors.New("T_analysis E!")
  62. }
  63. //logs.Println("Plugin 地址:", &p)
  64. // 查找库导出信息
  65. s, err := p.Lookup("T")
  66. if err != nil {
  67. logs.PrintlnError(Msid, "Plugin:", err)
  68. return errors.New("Plugin E!")
  69. }
  70. // 类型转换
  71. messagejson = s.(func(t string, b []byte) string)(topicName, message)
  72. // 开始处理
  73. //logs.Println("协议后:", messagejson)
  74. DeviceRealLogR_ = append(DeviceRealLogR_, "<-转换["+topicName+"]\\r\\n"+time.Now().Format("15:04:05")+"\\r\\n"+messagejson) // 加入设备日志
  75. }
  76. //logs.Println("首字符:", string(f[0]))
  77. if messagejson[0] != '{' {
  78. logs.Println(Msid, "data jsonE {}!")
  79. return errors.New("data jsonE {}!")
  80. }
  81. //结构体
  82. var json_r map[string]interface{}
  83. err := json.Unmarshal([]byte(messagejson), &json_r)
  84. if err != nil {
  85. logs.Println(Msid, "data jsonE !")
  86. return errors.New("data jsonE!")
  87. }
  88. // 数据 SSE 实时推送
  89. go lib.SseSubscribeSendAll(Device_r.T_sn, json_r)
  90. // 数据 SSELog 实时推送
  91. go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_)
  92. // 数据 Websocket 实时推送
  93. go lib.WebsocketSubscribeSendAll(Device_r.T_sn, json_r)
  94. // 详细处理
  95. json_r = AnalysisMap(Msid, Device_r, json_r, "")
  96. logs.Println("json_r:", json_r)
  97. // 合并json
  98. Device_r.Read_Tidy() // 提前最新的
  99. //logs.Println(Msid,"Device_r.T_dataJson:", Device_r.T_dataJson)
  100. //logs.Println(Msid,"json_r:", json_r)
  101. //var json_x map[string]interface{}
  102. if Device_r.T_dataJson != nil {
  103. // 托管平台 参数
  104. for _, field := range Device_r.T_ProductJson.T_renew {
  105. value := lib.JsonGetField(Device_r.T_dataJson, field)
  106. if value != nil {
  107. //logs.Println(Msid,"JsonSetField:", field,value)
  108. lib.JsonSetField(json_r, field, value)
  109. }
  110. }
  111. //logs.Println(Msid,"json_r:", json_r)
  112. // 合并
  113. json_str, _ := json.Marshal(json_r)
  114. //logs.Println(Msid,"json_x_b:", json_str)
  115. json.Unmarshal(json_str, &Device_r.T_dataJson)
  116. //json_r = json_x
  117. } else {
  118. Device_r.T_dataJson = json_r
  119. }
  120. logs.Println(Msid, "Device_r.T_dataJson_New:", Device_r.T_dataJson)
  121. //logs.Println(Msid,"Device_r.T_data 更新:", json_r)
  122. //Device_r.T_dataJson = json_r
  123. Device_r.Update("T_data")
  124. //logs.Println("DeviceRealLogR_:",DeviceRealLogR_)
  125. // 更新设备记录日志
  126. //v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  127. //if is {
  128. // v.Data = append(v.Data, DeviceRealLogR_...)
  129. // logs.DeviceRealLogMap[Device_r.T_sn] = v
  130. //}
  131. return nil
  132. }
  133. // 平台->设备
  134. func PushHandle(Device_r *Device.Device, topicName string, message string) error {
  135. Msid := fmt.Sprintf("Msid:%d", uint8(time.Now().UnixNano())) // 通讯处理 唯一ID
  136. //Msidstart := time.Now()
  137. // 流量统计
  138. Mu_PullHandle.Lock()
  139. sd, sdis := StatisticalData.FlowCountMap[Device_r.T_ProductID]
  140. if !sdis {
  141. sd = &StatisticalData.FlowCount_T{}
  142. StatisticalData.FlowCountMap[Device_r.T_ProductID] = sd
  143. }
  144. sd.T_receive += int64(len(message))
  145. Mu_PullHandle.Unlock()
  146. // 加入设备日志
  147. DeviceRealLogR_ := []string{}
  148. logs.Println(Msid, "PushHandle ", topicName, message) // 加入设备日志
  149. // 设备协议
  150. ProductProt_r := Product.ProductProt{Id: Device_r.T_ProductJson.T_prot}
  151. if Device_r.T_ProductJson.T_prot != 0 {
  152. if !ProductProt_r.Read() {
  153. logs.Println(Msid, "PushHandle", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", Device_r.T_ProductJson.T_prot)+" 设备协议找不到!")
  154. return errors.New("设备协议找不到")
  155. }
  156. }
  157. var byte_r []byte
  158. byte_r = []byte(message)
  159. var topicName_r = topicName
  160. // 是否加载转换协议
  161. if ProductProt_r.T_lang != 0 && len(ProductProt_r.T_analysis) != 0 {
  162. // 根据库的存放路径加载库
  163. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  164. if err != nil {
  165. logs.PrintlnError(Msid, "PushHandle:", err)
  166. return err
  167. }
  168. // 查找库导出信息
  169. s, err := p.Lookup("R")
  170. if err != nil {
  171. logs.PrintlnError(Msid, "PushHandle:", err)
  172. return err
  173. }
  174. DeviceRealLogR_ = append(DeviceRealLogR_, "->转换["+topicName+"]"+time.Now().Format("15:04:05")+"\\r\\n"+message) // 加入设备日志
  175. // 类型转换
  176. topicName_r, byte_r = s.(func(sn string, b string) (string, []byte))(topicName, message)
  177. // 开始处理
  178. //logs.Println(Msid,"协议后:", byte_r)
  179. } else {
  180. topicName_r = "/" + topicName_r
  181. }
  182. // 无效消息,不用推送
  183. if len(topicName_r) == 0 || len(byte_r) == 0 {
  184. logs.Println(Msid, "无效消息,不用推送!", len(topicName_r), len(byte_r))
  185. return errors.New("无效消息,不用推送")
  186. }
  187. // 长连接 网关
  188. //logs.Println("ProductProt_r.T_mode:", ProductProt_r.T_mode)
  189. ProductMode_r, is := Product.ProductModeMap[Device_r.T_ProductJson.T_mode]
  190. if !is {
  191. logs.Println(Msid, "没有找到网关!", Device_r.T_ProductJson.T_mode)
  192. return errors.New("没有找到网关!")
  193. }
  194. //初始化插件
  195. FunPushHandle_r, err := ProductMode_r.T_Plugin.Lookup("FunPushHandle")
  196. if err != nil {
  197. logs.PrintlnError(Msid, "网关 错误!", Device_r.T_ProductJson.T_mode, err)
  198. return errors.New("网关 错误!")
  199. }
  200. // 初始化插件
  201. err = FunPushHandle_r.(func(T_topic string, T_data []byte) error)(topicName_r, byte_r)
  202. if err != nil {
  203. logs.PrintlnError(Msid, "网关 推送失败!", Device_r.T_ProductJson.T_mode, err)
  204. return errors.New("网关 推送失败!")
  205. }
  206. // 加入设备日志 类型
  207. Data_log := ""
  208. if byte_r[0] == '{' {
  209. Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + string(byte_r) // 加入设备日志
  210. } else {
  211. Data_log = "->推送[" + topicName_r + "]" + time.Now().Format("15:04:05") + "\\r\\n" + hex.EncodeToString(byte_r) // 加入设备日志
  212. }
  213. // 加入设备日志
  214. DeviceRealLogR_ = append(DeviceRealLogR_, Data_log) // 加入设备日志
  215. logs.PrintlnData(Data_log)
  216. //// 更新设备记录日志
  217. //v, is := logs.DeviceRealLogMap[Device_r.T_sn]
  218. //if is {
  219. // v.Data = append(v.Data, DeviceRealLogR_...)
  220. // logs.DeviceRealLogMap[Device_r.T_sn] = v
  221. //}
  222. // 数据 SSELog 实时推送
  223. go lib.SseLogSubscribeSendAll(Device_r.T_sn, DeviceRealLogR_)
  224. // 托管平台 参数
  225. if len(Device_r.T_ProductJson.T_renew) == 0 {
  226. return nil // 没有 托管平台
  227. }
  228. logs.Println(Msid, "托管平台:", len(Device_r.T_ProductJson.T_renew), Device_r.T_ProductJson.T_renew)
  229. var articleSlide map[string]interface{}
  230. err = json.Unmarshal([]byte(message), &articleSlide)
  231. if err != nil {
  232. logs.Println(Msid, "参数 托管平台 json 解析错误!", err.Error())
  233. }
  234. Device_r.Read_Tidy() // 提前最新的
  235. for _, field := range Device_r.T_ProductJson.T_renew {
  236. value := lib.JsonGetField(articleSlide, field)
  237. if value != nil {
  238. lib.JsonSetField(Device_r.T_dataJson, field, value)
  239. }
  240. }
  241. logs.Println(Msid, "Device_r.T_data 更新:", Device_r.T_dataJson)
  242. Device_r.Update("T_data")
  243. return nil
  244. }
  245. // 处理数据
  246. func AnalysisMap(Msid string, Device_r *Device.Device, ArticleSlide map[string]interface{}, JointTab string) (json_r map[string]interface{}) {
  247. //JointTab += "."
  248. //T_r := make(map[string]interface{})
  249. json_r = make(map[string]interface{})
  250. for key, value := range ArticleSlide {
  251. //fmt.Println(reflect.TypeOf(value).String())
  252. switch reflect.TypeOf(value).String() {
  253. case "map[string]interface {}":
  254. json_r[key] = AnalysisMap(Msid, Device_r, value.(map[string]interface{}), JointTab+key+".")
  255. //return json_r
  256. break
  257. case "[]interface {}":
  258. for _, valuex := range value.([]interface{}) {
  259. if reflect.TypeOf(valuex).String() == "map[string]interface {}" {
  260. json_r[key] = AnalysisMap(Msid, Device_r, valuex.(map[string]interface{}), JointTab+key+".")
  261. }
  262. }
  263. //return json_r
  264. break
  265. default:
  266. json_r[key] = value
  267. // 根 TAB
  268. if len(JointTab) == 0 {
  269. //logs.Println("TAB.:",key)
  270. bson_r := bson.M{key: value}
  271. // 数据存储
  272. //T_filter := Device_r.T_ProductJson.T_filter[key]
  273. if lib.StringExistsInSlice(key, Device_r.T_ProductJson.T_filter) {
  274. T_filter_v := lib.GetMapRecursion(key, Device_r.T_dataJson)
  275. //logs.Println("T_filter:",key,value,T_filter_v)
  276. if value != T_filter_v {
  277. logs.Println(Msid, "T_filter 不同:", key)
  278. Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
  279. }
  280. } else {
  281. Device.Data_Add(Device_r.T_sn+"_"+key, &bson_r)
  282. }
  283. // 消息转发
  284. Relay(Msid, Device_r, JointTab, &bson_r)
  285. }
  286. break
  287. }
  288. }
  289. if len(JointTab) == 0 {
  290. return
  291. }
  292. // 多级 TAB
  293. JointTab = JointTab[:len(JointTab)-1]
  294. //logs.Println("TAB-:",JointTab)
  295. bson_r := bson.M{}
  296. for key, value := range json_r {
  297. //fmt.Println(key, "->:", value)
  298. bson_r[key] = value
  299. }
  300. // 数据存储
  301. if lib.StringExistsInSlice(JointTab, Device_r.T_ProductJson.T_filter) {
  302. T_filter_v := lib.GetMapRecursion(JointTab, Device_r.T_dataJson)
  303. //logs.Println("T_filter:",JointTab,json_r,T_filter_v)
  304. if !reflect.DeepEqual(json_r, T_filter_v) {
  305. //logs.Println("T_filter 不同",JointTab)
  306. Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
  307. }
  308. } else {
  309. Device.Data_Add(Device_r.T_sn+"_"+JointTab, &bson_r)
  310. }
  311. // 消息转发
  312. Relay(Msid, Device_r, JointTab, &bson_r)
  313. return json_r
  314. }