Nats.go 20 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/models/Account"
  6. "Cold_Api/models/Device"
  7. "Cold_Api/models/System"
  8. "encoding/xml"
  9. "fmt"
  10. "github.com/astaxie/beego/cache"
  11. "github.com/beego/beego/v2/core/logs"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strconv"
  15. "strings"
  16. )
  17. var redisCache_NatsServer cache.Cache
  18. func init() {
  19. logs.Info("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. logs.Error("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Info("nats OK!")
  28. if conf.RunMode == "prod" {
  29. go NatsInit()
  30. }
  31. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  32. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  33. logs.Info(config)
  34. redisCache_NatsServer, err = cache.NewCache("redis", config)
  35. if err != nil || redisCache_NatsServer == nil {
  36. errMsg := "failed to init redis"
  37. logs.Error(errMsg, err)
  38. panic(errMsg)
  39. }
  40. }
  41. func NatsInit() {
  42. // 获取微信二维码 返回结果
  43. _, _ = lib.Nats.QueueSubscribe("Wx_BasicMessage_Event_QRCode", "Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  44. //logs.Debug(fmt.Sprintf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data)))
  45. type Person_QRCode struct {
  46. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  47. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  48. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  49. EventKey string `xml:"EventKey"`
  50. }
  51. var person_QRCode Person_QRCode
  52. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  53. if err1 != nil {
  54. logs.Error("Unmarshal error")
  55. _ = lib.Nats.Publish(m.Reply, []byte(""))
  56. return
  57. }
  58. // 进入 二维码配对
  59. //logs.Info("FromUserName-", person_QRCode.FromUserName)
  60. //logs.Info("EventKey-", person_QRCode.EventKey)
  61. // 开始 处理消息
  62. if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
  63. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  64. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  65. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  66. T_uuid_name := strings.Split(decryptCode, "/")
  67. var uuid, name string
  68. if len(T_uuid_name) == 2 {
  69. uuid = strings.Split(decryptCode, "/")[0]
  70. name = strings.Split(decryptCode, "/")[1]
  71. }
  72. logs.Info("解密结果 UUID name:", decryptCode, " Content_r:", Content_r)
  73. Admin_r, err := Account.Read_Admin_ByUuid(uuid)
  74. if err != nil {
  75. _ = lib.Nats.Publish(m.Reply, []byte(""))
  76. return
  77. }
  78. T_wx := fmt.Sprintf("%s/%s", person_QRCode.FromUserName, name)
  79. // 已绑定 name相同 提示已绑定
  80. // 已绑定 name不同 在数据库修改name
  81. if strings.Contains(Admin_r.T_wx, person_QRCode.FromUserName) {
  82. list := strings.Split(Admin_r.T_wx, "|")
  83. for _, v := range list {
  84. if strings.Split(v, "/")[0] == person_QRCode.FromUserName {
  85. if v == T_wx {
  86. _ = lib.Nats.Publish(m.Reply, []byte(name+" 已绑定!"))
  87. return
  88. }
  89. Admin_r.T_wx = strings.Replace(Admin_r.T_wx, v, T_wx, 1)
  90. }
  91. }
  92. } else {
  93. Admin_r.T_wx += T_wx + "|"
  94. }
  95. Account.Update_Admin(Admin_r, "T_wx")
  96. _ = lib.Nats.Publish(m.Reply, []byte(name+" 绑定成功!"))
  97. return
  98. }
  99. _ = lib.Nats.Publish(m.Reply, []byte(""))
  100. })
  101. // 请求-响应 验证登录
  102. _, _ = lib.Nats.QueueSubscribe("Cold_User_verification", "Cold_User_verification", func(m *nats.Msg) {
  103. logs.Debug(fmt.Sprintf("Cold_User_verification message: %s\n", string(m.Data)))
  104. type T_R struct {
  105. Code int16 `xml:"Code"`
  106. Msg string `xml:"Msg"`
  107. Pid int `xml:"Pid"` // 公司id
  108. Data Account.Admin_R `xml:"Data"` // 泛型
  109. }
  110. var t_R T_R
  111. // 验证登录
  112. b_, admin_r := Account.Verification(string(m.Data), "")
  113. if !b_ {
  114. logs.Error("Unmarshal error")
  115. t_R.Code = 201
  116. t_R.Msg = "User_tokey Err!"
  117. b, _ := msgpack.Marshal(&t_R)
  118. _ = lib.Nats.Publish(m.Reply, b)
  119. return
  120. }
  121. T_pid, is := Account.Redis_Tokey_T_pid_Get(string(m.Data))
  122. if !is {
  123. t_R.Code = 201
  124. t_R.Msg = "User_tokey Err!"
  125. b, _ := msgpack.Marshal(&t_R)
  126. _ = lib.Nats.Publish(m.Reply, b)
  127. return
  128. }
  129. t_R.Pid = T_pid
  130. t_R.Code = 200
  131. t_R.Msg = "ok"
  132. t_R.Data = Account.AdminToAdmin_R(admin_r)
  133. b, _ := msgpack.Marshal(&t_R)
  134. _ = lib.Nats.Publish(m.Reply, b)
  135. })
  136. // 请求-响应 通过公司名获取所有公司信息
  137. _, _ = lib.Nats.QueueSubscribe("Cold_User_CompanyListAllByT_name", "Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  138. logs.Debug(fmt.Sprintf("CompanyListAll message: %s\n", string(m.Data)))
  139. type T_R struct {
  140. Code int16 `xml:"Code"`
  141. Msg string `xml:"Msg"`
  142. Data []Account.Company `xml:"Data"` // 泛型
  143. }
  144. var t_R T_R
  145. t_R.Code = 200
  146. t_R.Msg = "ok"
  147. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  148. b, _ := msgpack.Marshal(&t_R)
  149. _ = lib.Nats.Publish(m.Reply, b)
  150. })
  151. // 请求-响应 获取公司
  152. _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_id", "Cold_ReadCompanyByT_id", func(m *nats.Msg) {
  153. logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_id message: %s\n", string(m.Data)))
  154. type T_R struct {
  155. Code int16 `xml:"Code"`
  156. Msg string `xml:"Msg"`
  157. Data Account.Company `xml:"Data"` // 泛型
  158. }
  159. var t_R T_R
  160. id, _ := strconv.Atoi(string(m.Data))
  161. company, err := Account.Read_Company_ById(id)
  162. if err != nil {
  163. t_R.Code = 202
  164. t_R.Msg = err.Error()
  165. b, _ := msgpack.Marshal(&t_R)
  166. _ = lib.Nats.Publish(m.Reply, b)
  167. }
  168. t_R.Code = 200
  169. t_R.Msg = "ok"
  170. t_R.Data = company
  171. b, _ := msgpack.Marshal(&t_R)
  172. _ = lib.Nats.Publish(m.Reply, b)
  173. })
  174. // 请求-响应 获取公司 通过秘钥
  175. _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_key", "Cold_ReadCompanyByT_key", func(m *nats.Msg) {
  176. logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_key message: %s\n", string(m.Data)))
  177. type T_R struct {
  178. Code int16 `xml:"Code"`
  179. Msg string `xml:"Msg"`
  180. Data Account.Company `xml:"Data"` // 泛型
  181. }
  182. var t_R T_R
  183. company, err := Account.Read_Company_ByKey(string(m.Data))
  184. if err != nil {
  185. t_R.Code = 202
  186. t_R.Msg = err.Error()
  187. b, _ := msgpack.Marshal(&t_R)
  188. _ = lib.Nats.Publish(m.Reply, b)
  189. }
  190. t_R.Code = 200
  191. t_R.Msg = "ok"
  192. t_R.Data = company
  193. b, _ := msgpack.Marshal(&t_R)
  194. _ = lib.Nats.Publish(m.Reply, b)
  195. })
  196. // 请求-响应 获取所有用户列表
  197. _, _ = lib.Nats.QueueSubscribe("Cold_User_UserListAll", "Cold_User_UserListAll", func(m *nats.Msg) {
  198. type T_R struct {
  199. Code int16 `xml:"Code"`
  200. Msg string `xml:"Msg"`
  201. Data []Account.Admin_R `xml:"Data"` // 泛型
  202. }
  203. var t_R T_R
  204. t_R.Code = 200
  205. t_R.Msg = "ok"
  206. t_R.Data = Account.Read_User_List_All()
  207. b, _ := msgpack.Marshal(&t_R)
  208. _ = lib.Nats.Publish(m.Reply, b)
  209. })
  210. // 请求-响应 检查用户权限
  211. _, _ = lib.Nats.QueueSubscribe("Cold_User_CheckUserPermissions", "Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  212. logs.Debug(fmt.Sprintf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data)))
  213. type T_Req struct {
  214. Power_Id int `xml:"Power_Id"` // 权限id
  215. Req_Url string `xml:"Req_Url"` // 请求url
  216. }
  217. type T_R struct {
  218. Code int16 `xml:"Code"`
  219. Msg string `xml:"Msg"`
  220. Pass bool `xml:"Pass"` // 泛型
  221. }
  222. var t_Req T_Req
  223. var t_R T_R
  224. err := msgpack.Unmarshal(m.Data, &t_Req)
  225. if err != nil {
  226. t_R.Code = 202
  227. t_R.Msg = "Unmarshal error"
  228. b, _ := msgpack.Marshal(&t_R)
  229. _ = lib.Nats.Publish(m.Reply, b)
  230. return
  231. }
  232. power, err := Account.Read_Power_ById(t_Req.Power_Id)
  233. if err != nil {
  234. t_R.Code = 202
  235. t_R.Msg = err.Error()
  236. b, _ := msgpack.Marshal(&t_R)
  237. _ = lib.Nats.Publish(m.Reply, b)
  238. }
  239. flag := false
  240. if power.T_menu == "*" {
  241. flag = true
  242. }
  243. if !flag {
  244. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
  245. for _, v := range api {
  246. if strings.Contains(conf.Version+v.T_uri, t_Req.Req_Url) {
  247. flag = true
  248. break
  249. }
  250. }
  251. }
  252. t_R.Code = 200
  253. t_R.Msg = "ok"
  254. t_R.Pass = flag
  255. b, _ := msgpack.Marshal(&t_R)
  256. _ = lib.Nats.Publish(m.Reply, b)
  257. })
  258. // 发布-订阅 模式,异步订阅 AddSysLogs
  259. _, _ = lib.Nats.QueueSubscribe("Cold_User_AddSysLogs", "Cold_User_AddSysLogs", func(m *nats.Msg) {
  260. logs.Debug(fmt.Sprintf("AddSysLogs message: %s\n", string(m.Data)))
  261. type T_S struct {
  262. T_class string
  263. T_title string
  264. T_txt interface{}
  265. }
  266. var t_S T_S
  267. err := msgpack.Unmarshal(m.Data, &t_S)
  268. if err != nil {
  269. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  270. return
  271. }
  272. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  273. })
  274. // 发布-订阅 模式,异步订阅 AddUserLogs
  275. _, _ = lib.Nats.QueueSubscribe("Cold_User_AddUserLogs", "Cold_User_AddUserLogs", func(m *nats.Msg) {
  276. logs.Debug(fmt.Sprintf("AddUserLogs message: %s\n", string(m.Data)))
  277. type T_S struct {
  278. T_uuid string
  279. T_class string
  280. T_title string
  281. T_txt interface{}
  282. }
  283. var t_S T_S
  284. err := msgpack.Unmarshal(m.Data, &t_S)
  285. if err != nil {
  286. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  287. return
  288. }
  289. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  290. })
  291. // 请求-响应 获取设备
  292. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceByT_sn", "Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  293. logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
  294. type T_R struct {
  295. Code int16 `xml:"Code"`
  296. Msg string `xml:"Msg"`
  297. Data Device.Device `xml:"Data"` // 泛型
  298. }
  299. var t_R T_R
  300. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  301. if err != nil {
  302. t_R.Code = 202
  303. t_R.Msg = "查询失败"
  304. b, _ := msgpack.Marshal(&t_R)
  305. _ = lib.Nats.Publish(m.Reply, b)
  306. }
  307. t_R.Code = 200
  308. t_R.Msg = "ok"
  309. t_R.Data = device
  310. b, _ := msgpack.Marshal(&t_R)
  311. _ = lib.Nats.Publish(m.Reply, b)
  312. })
  313. // 请求-响应 获取传感器列表
  314. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceSensorALLByT_sn", "Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  315. logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
  316. type T_R struct {
  317. Code int16 `xml:"Code"`
  318. Msg string `xml:"Msg"`
  319. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  320. }
  321. var t_R T_R
  322. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  323. t_R.Code = 200
  324. t_R.Msg = "ok"
  325. t_R.Data = device
  326. b, _ := msgpack.Marshal(&t_R)
  327. _ = lib.Nats.Publish(m.Reply, b)
  328. })
  329. // 请求-响应 获取设备数据列表
  330. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataListBy_T_snid", "Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  331. logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data)))
  332. type T_Req struct {
  333. T_snid string `xml:"T_snid"`
  334. Time_start string `xml:"Time_start"`
  335. Time_end string `xml:"Time_end"`
  336. Page int `xml:"Page"`
  337. Page_z int `xml:"Page_z"`
  338. }
  339. type T_R struct {
  340. Code int16 `xml:"Code"`
  341. Msg string `xml:"Msg"`
  342. Count int64 `xml:"Count"`
  343. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  344. }
  345. var t_Req T_Req
  346. var t_R T_R
  347. err := msgpack.Unmarshal(m.Data, &t_Req)
  348. if err != nil {
  349. t_R.Code = 202
  350. t_R.Msg = "Unmarshal error"
  351. b, _ := msgpack.Marshal(&t_R)
  352. _ = lib.Nats.Publish(m.Reply, b)
  353. return
  354. }
  355. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  356. t_R.Code = 200
  357. t_R.Msg = "ok"
  358. t_R.Count = cnt
  359. t_R.Data = deviceData
  360. b, _ := msgpack.Marshal(&t_R)
  361. _ = lib.Nats.Publish(m.Reply, b)
  362. })
  363. // 请求-响应 通过key获取传感器列表
  364. _, _ = lib.Nats.QueueSubscribe("Cold_CompanyDeviceSensor_List_ByKey", "Cold_CompanyDeviceSensor_List_ByKey", func(m *nats.Msg) {
  365. logs.Debug(fmt.Sprintf("Cold_CompanyDeviceSensor_List_ByKey message: %s\n", string(m.Data)))
  366. type T_Req struct {
  367. T_sn string `xml:"T_sn"`
  368. T_key string `xml:"T_key"`
  369. }
  370. type T_R struct {
  371. Code int16 `xml:"Code"`
  372. Msg string `xml:"Msg"`
  373. Count int64 `xml:"Count"`
  374. Data []Device.DeviceSensor_R `xml:"Data"` // 泛型
  375. }
  376. var t_Req T_Req
  377. var t_R T_R
  378. err := msgpack.Unmarshal(m.Data, &t_Req)
  379. if err != nil {
  380. t_R.Code = 202
  381. t_R.Msg = "Unmarshal error"
  382. b, _ := msgpack.Marshal(&t_R)
  383. _ = lib.Nats.Publish(m.Reply, b)
  384. return
  385. }
  386. // 查询公司
  387. Company_r, err := Account.Read_Company_ByKey(t_Req.T_key)
  388. if err != nil {
  389. t_R.Code = 202
  390. t_R.Msg = "T_key error"
  391. b, _ := msgpack.Marshal(&t_R)
  392. _ = lib.Nats.Publish(m.Reply, b)
  393. return
  394. }
  395. // 查询公司下面所有子公司id
  396. T_pids := Account.ReadCompanyIds_T_path(Company_r.T_path)
  397. Account.Read_Company_All_Maps()
  398. deviceData, cnt := Device.Read_CompanyDeviceSensor_List_For_Data_ByKey(T_pids, t_Req.T_sn)
  399. t_R.Code = 200
  400. t_R.Msg = "ok"
  401. t_R.Count = cnt
  402. t_R.Data = deviceData
  403. b, _ := msgpack.Marshal(&t_R)
  404. _ = lib.Nats.Publish(m.Reply, b)
  405. })
  406. // 请求-响应 根据时间获取设备数据(单条)最新数据
  407. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataBy_T_snid_T_time", "Cold_ReadDeviceDataBy_T_snid_T_time", func(m *nats.Msg) {
  408. logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataBy_T_snid_T_time message: %s\n", string(m.Data)))
  409. type T_Req struct {
  410. T_sn string `xml:"T_sn"`
  411. T_id int `xml:"T_id"`
  412. Time string `xml:"Time"`
  413. }
  414. type T_R struct {
  415. Code int16 `xml:"Code"`
  416. Msg string `xml:"Msg"`
  417. Data Device.DeviceData_ `xml:"Data"` // 泛型
  418. }
  419. var t_Req T_Req
  420. var t_R T_R
  421. err := msgpack.Unmarshal(m.Data, &t_Req)
  422. if err != nil {
  423. t_R.Code = 202
  424. t_R.Msg = "Unmarshal error"
  425. b, _ := msgpack.Marshal(&t_R)
  426. _ = lib.Nats.Publish(m.Reply, b)
  427. return
  428. }
  429. deviceData := Device.Read_DeviceData_By_Time(t_Req.T_sn, t_Req.T_id, t_Req.Time)
  430. t_R.Code = 200
  431. t_R.Msg = "ok"
  432. t_R.Data = deviceData
  433. b, _ := msgpack.Marshal(&t_R)
  434. _ = lib.Nats.Publish(m.Reply, b)
  435. })
  436. // 请求-响应 获取设备
  437. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDevice_List", "Cold_ReadDevice_List", func(m *nats.Msg) {
  438. logs.Debug(fmt.Sprintf("Cold_ReadDevice_List message: %s\n", string(m.Data)))
  439. type T_Req struct {
  440. Key string `xml:"Key"`
  441. Name string `xml:"Name"`
  442. Page int `xml:"Page"`
  443. Page_z int `xml:"Page_z"`
  444. }
  445. var t_Req T_Req
  446. type T_R struct {
  447. Code int16 `xml:"Code"`
  448. Msg string `xml:"Msg"`
  449. Count int64 `xml:"Count"`
  450. Data []Device.Device_R `xml:"Data"` // 泛型
  451. }
  452. var t_R T_R
  453. err := msgpack.Unmarshal(m.Data, &t_Req)
  454. if err != nil {
  455. t_R.Code = 202
  456. t_R.Msg = "Unmarshal error"
  457. b, _ := msgpack.Marshal(&t_R)
  458. _ = lib.Nats.Publish(m.Reply, b)
  459. return
  460. }
  461. // 查询公司
  462. Company_r, err := Account.Read_Company_ByKey(t_Req.Key)
  463. if err != nil {
  464. t_R.Code = 202
  465. t_R.Msg = "T_key error"
  466. b, _ := msgpack.Marshal(&t_R)
  467. _ = lib.Nats.Publish(m.Reply, b)
  468. return
  469. }
  470. deviceList, count := Device.Read_Device_List(&Account.Admin{T_pid: Company_r.Id}, []string{}, Company_r.Id, t_Req.Name, "", "", 0, t_Req.Page, t_Req.Page_z)
  471. t_R.Code = 200
  472. t_R.Msg = "ok"
  473. t_R.Count = count
  474. t_R.Data = deviceList
  475. b, _ := msgpack.Marshal(&t_R)
  476. _ = lib.Nats.Publish(m.Reply, b)
  477. })
  478. // 发布-订阅 修改设备校准时间
  479. _, _ = lib.Nats.QueueSubscribe("Cold_UpdateDevice_CalibrationTime", "Cold_UpdateDevice_CalibrationTime", func(m *nats.Msg) {
  480. logs.Debug(fmt.Sprintf("Cold_UpdateDevice_CalibrationTime message: %s\n", string(m.Data)))
  481. type T_Req struct {
  482. T_sn string
  483. T_CalibrationTime string
  484. }
  485. type T_R struct {
  486. Code int16 `xml:"Code"`
  487. Msg string `xml:"Msg"`
  488. Data interface{} `xml:"Data"` // 泛型
  489. }
  490. var t_R T_R
  491. var t_req T_Req
  492. err := msgpack.Unmarshal(m.Data, &t_req)
  493. if err != nil {
  494. t_R.Code = 202
  495. t_R.Msg = "Unmarshal error"
  496. b, _ := msgpack.Marshal(&t_R)
  497. _ = lib.Nats.Publish(m.Reply, b)
  498. return
  499. }
  500. device, err := Device.Read_Device_ByT_sn(t_req.T_sn)
  501. if err != nil {
  502. if err.Error() == "record not found" {
  503. t_R.Code = 202
  504. t_R.Msg = "SN不存在"
  505. b, _ := msgpack.Marshal(&t_R)
  506. _ = lib.Nats.Publish(m.Reply, b)
  507. return
  508. }
  509. t_R.Code = 202
  510. t_R.Msg = "查询失败"
  511. b, _ := msgpack.Marshal(&t_R)
  512. _ = lib.Nats.Publish(m.Reply, b)
  513. return
  514. }
  515. // 校准时间
  516. CalibrationTime, CalibrationTime_is := lib.DateStrToDate(t_req.T_CalibrationTime)
  517. if CalibrationTime_is {
  518. device.T_CalibrationTime = CalibrationTime
  519. }
  520. if is := Device.Update_Device(device, "T_CalibrationTime"); !is {
  521. t_R.Code = 202
  522. t_R.Msg = "修改校准时间失败"
  523. b, _ := msgpack.Marshal(&t_R)
  524. _ = lib.Nats.Publish(m.Reply, b)
  525. return
  526. }
  527. t_R.Code = 200
  528. t_R.Msg = "ok"
  529. b, _ := msgpack.Marshal(&t_R)
  530. _ = lib.Nats.Publish(m.Reply, b)
  531. return
  532. })
  533. //通过sn获取设备最新开始监控时间以及结束时间
  534. _, _ = lib.Nats.QueueSubscribe("Read_DeviceTask_List_By_Condition", "Read_DeviceTask_List_By_Condition", func(m *nats.Msg) {
  535. logs.Debug(fmt.Sprintf("Read_DeviceTask_List_By_Condition message: %s\n", string(m.Data)))
  536. type T_R struct {
  537. Code int16 `xml:"Code"`
  538. Msg string `xml:"Msg"`
  539. Data Device.DeviceTask `xml:"Data"`
  540. }
  541. type T_Req struct {
  542. T_sn string `xml:"T_sn"`
  543. }
  544. var t_R T_R
  545. var t_Req T_Req
  546. err := msgpack.Unmarshal(m.Data, &t_Req)
  547. if err != nil {
  548. t_R.Code = 202
  549. t_R.Msg = "Unmarshal error"
  550. b, _ := msgpack.Marshal(&t_R)
  551. _ = lib.Nats.Publish(m.Reply, b)
  552. return
  553. }
  554. l, err := Device.Read_DeviceTask_List_By_Condition(t_Req.T_sn)
  555. if err == nil && l.Id != 0 {
  556. t_R.Code = 200
  557. t_R.Msg = "ok"
  558. t_R.Data = l
  559. b, _ := msgpack.Marshal(&t_R)
  560. _ = lib.Nats.Publish(m.Reply, b)
  561. return
  562. }
  563. t_R.Code = 202
  564. t_R.Msg = "查询失败当前数据为空"
  565. b, _ := msgpack.Marshal(&t_R)
  566. _ = lib.Nats.Publish(m.Reply, b)
  567. return
  568. })
  569. // 发布-订阅 通过sn_id 查询最新设备数据
  570. _, _ = lib.Nats.QueueSubscribe("Read_New_DeviceData", "Read_New_DeviceData", func(m *nats.Msg) {
  571. logs.Debug(fmt.Sprintf("Read_New_DeviceData message: %s\n", string(m.Data)))
  572. type T_R struct {
  573. Code int16 `xml:"Code"`
  574. Msg string `xml:"Msg"`
  575. Data Device.DeviceData_ `xml:"Data"`
  576. }
  577. type T_Req struct {
  578. T_sn string `xml:"T_sn"`
  579. T_id int `xml:"T_id"`
  580. }
  581. var t_R T_R
  582. var t_Req T_Req
  583. err := msgpack.Unmarshal(m.Data, &t_Req)
  584. if err != nil {
  585. t_R.Code = 202
  586. t_R.Msg = "Unmarshal error"
  587. b, _ := msgpack.Marshal(&t_R)
  588. _ = lib.Nats.Publish(m.Reply, b)
  589. return
  590. }
  591. r := Device.Read_DeviceData(t_Req.T_sn, t_Req.T_id)
  592. if r.T_id != 0 {
  593. t_R.Code = 200
  594. t_R.Msg = "ok"
  595. t_R.Data = r
  596. b, _ := msgpack.Marshal(&t_R)
  597. _ = lib.Nats.Publish(m.Reply, b)
  598. return
  599. }
  600. t_R.Code = 202
  601. t_R.Msg = "查询失败当前数据为空"
  602. b, _ := msgpack.Marshal(&t_R)
  603. _ = lib.Nats.Publish(m.Reply, b)
  604. return
  605. })
  606. // 查询大于开始时间的所有数据
  607. _, _ = lib.Nats.QueueSubscribe("Read_Start_Time_DeviceData", "Read_Start_Time_DeviceData", func(m *nats.Msg) {
  608. logs.Debug(fmt.Sprintf("Read_Start_Time_DeviceData message: %s\n", string(m.Data)))
  609. type T_R struct {
  610. Code int16 `xml:"Code"`
  611. Msg string `xml:"Msg"`
  612. Data []Device.DeviceData_ `xml:"Data"`
  613. }
  614. type T_Req struct {
  615. T_sn string `xml:"T_sn"`
  616. T_id int `xml:"T_id"`
  617. StartTime string `xml:"StartTime"`
  618. EndTime string `xml:"EndTime"`
  619. }
  620. var t_R T_R
  621. var t_Req T_Req
  622. err := msgpack.Unmarshal(m.Data, &t_Req)
  623. if err != nil {
  624. t_R.Code = 202
  625. t_R.Msg = "Unmarshal error"
  626. b, _ := msgpack.Marshal(&t_R)
  627. _ = lib.Nats.Publish(m.Reply, b)
  628. return
  629. }
  630. t, count := Device.Read_DeviceData_Start_Time(t_Req.T_sn, t_Req.T_id, t_Req.StartTime, t_Req.EndTime)
  631. if count > 0 {
  632. t_R.Code = 200
  633. t_R.Msg = "ok"
  634. t_R.Data = t
  635. b, _ := msgpack.Marshal(&t_R)
  636. _ = lib.Nats.Publish(m.Reply, b)
  637. return
  638. }
  639. t_R.Code = 202
  640. t_R.Msg = "查询失败当前数据为空"
  641. b, _ := msgpack.Marshal(&t_R)
  642. _ = lib.Nats.Publish(m.Reply, b)
  643. return
  644. })
  645. }