Nats.go 21 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/models/Account"
  6. "Cold_Api/models/Device"
  7. "Cold_Api/models/System"
  8. "encoding/xml"
  9. "fmt"
  10. "github.com/astaxie/beego/cache"
  11. "github.com/beego/beego/v2/core/logs"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strconv"
  15. "strings"
  16. )
  17. var redisCache_NatsServer cache.Cache
  18. func init() {
  19. logs.Info("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. logs.Error("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Info("nats OK!")
  28. if conf.RunMode == "prod" {
  29. go NatsInit()
  30. }
  31. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  32. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  33. logs.Info(config)
  34. redisCache_NatsServer, err = cache.NewCache("redis", config)
  35. if err != nil || redisCache_NatsServer == nil {
  36. errMsg := "failed to init redis"
  37. logs.Error(errMsg, err)
  38. panic(errMsg)
  39. }
  40. }
  41. func NatsInit() {
  42. // 获取微信二维码 返回结果
  43. _, _ = lib.Nats.QueueSubscribe("Wx_BasicMessage_Event_QRCode", "Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  44. //logs.Debug(fmt.Sprintf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data)))
  45. type Person_QRCode struct {
  46. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  47. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  48. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  49. EventKey string `xml:"EventKey"`
  50. }
  51. var person_QRCode Person_QRCode
  52. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  53. if err1 != nil {
  54. logs.Error("Unmarshal error")
  55. _ = lib.Nats.Publish(m.Reply, []byte(""))
  56. return
  57. }
  58. // 进入 二维码配对
  59. //logs.Info("FromUserName-", person_QRCode.FromUserName)
  60. //logs.Info("EventKey-", person_QRCode.EventKey)
  61. // 开始 处理消息
  62. if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
  63. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  64. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  65. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  66. T_uuid_name := strings.Split(decryptCode, "/")
  67. var uuid, name string
  68. if len(T_uuid_name) == 2 {
  69. uuid = strings.Split(decryptCode, "/")[0]
  70. name = strings.Split(decryptCode, "/")[1]
  71. }
  72. logs.Info("解密结果 UUID name:", decryptCode, " Content_r:", Content_r)
  73. Admin_r, err := Account.Read_Admin_ByUuid(uuid)
  74. if err != nil {
  75. _ = lib.Nats.Publish(m.Reply, []byte(""))
  76. return
  77. }
  78. T_wx := fmt.Sprintf("%s/%s", person_QRCode.FromUserName, name)
  79. // 已绑定 name相同 提示已绑定
  80. // 已绑定 name不同 在数据库修改name
  81. if strings.Contains(Admin_r.T_wx, person_QRCode.FromUserName) {
  82. list := strings.Split(Admin_r.T_wx, "|")
  83. for _, v := range list {
  84. if strings.Split(v, "/")[0] == person_QRCode.FromUserName {
  85. if v == T_wx {
  86. _ = lib.Nats.Publish(m.Reply, []byte(name+" 已绑定!"))
  87. return
  88. }
  89. Admin_r.T_wx = strings.Replace(Admin_r.T_wx, v, T_wx, 1)
  90. }
  91. }
  92. } else {
  93. Admin_r.T_wx += T_wx + "|"
  94. }
  95. Account.Update_Admin(Admin_r, "T_wx")
  96. _ = lib.Nats.Publish(m.Reply, []byte(name+" 绑定成功!"))
  97. return
  98. }
  99. _ = lib.Nats.Publish(m.Reply, []byte(""))
  100. })
  101. // 请求-响应 验证登录
  102. _, _ = lib.Nats.QueueSubscribe("Cold_User_verification", "Cold_User_verification", func(m *nats.Msg) {
  103. logs.Debug(fmt.Sprintf("Cold_User_verification message: %s\n", string(m.Data)))
  104. type T_R struct {
  105. Code int16 `xml:"Code"`
  106. Msg string `xml:"Msg"`
  107. Pid int `xml:"Pid"` // 公司id
  108. Data Account.Admin_R `xml:"Data"` // 泛型
  109. }
  110. var t_R T_R
  111. // 验证登录
  112. b_, admin_r := Account.Verification(string(m.Data), "")
  113. if !b_ {
  114. logs.Error("Unmarshal error")
  115. t_R.Code = 201
  116. t_R.Msg = "User_tokey Err!"
  117. b, _ := msgpack.Marshal(&t_R)
  118. _ = lib.Nats.Publish(m.Reply, b)
  119. return
  120. }
  121. t_R.Pid = admin_r.T_pid
  122. T_pid, _ := Account.Redis_Tokey_T_pid_Get(string(m.Data))
  123. if T_pid > 0 {
  124. t_R.Pid = T_pid
  125. }
  126. t_R.Code = 200
  127. t_R.Msg = "ok"
  128. t_R.Data = Account.AdminToAdmin_R(admin_r)
  129. b, _ := msgpack.Marshal(&t_R)
  130. _ = lib.Nats.Publish(m.Reply, b)
  131. })
  132. // 请求-响应 通过公司名获取所有公司信息
  133. _, _ = lib.Nats.QueueSubscribe("Cold_User_CompanyListAllByT_name", "Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  134. logs.Debug(fmt.Sprintf("CompanyListAll message: %s\n", string(m.Data)))
  135. type T_R struct {
  136. Code int16 `xml:"Code"`
  137. Msg string `xml:"Msg"`
  138. Data []Account.Company `xml:"Data"` // 泛型
  139. }
  140. var t_R T_R
  141. t_R.Code = 200
  142. t_R.Msg = "ok"
  143. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  144. b, _ := msgpack.Marshal(&t_R)
  145. _ = lib.Nats.Publish(m.Reply, b)
  146. })
  147. // 请求-响应 获取公司
  148. _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_id", "Cold_ReadCompanyByT_id", func(m *nats.Msg) {
  149. logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_id message: %s\n", string(m.Data)))
  150. type T_R struct {
  151. Code int16 `xml:"Code"`
  152. Msg string `xml:"Msg"`
  153. Data Account.Company `xml:"Data"` // 泛型
  154. }
  155. var t_R T_R
  156. id, _ := strconv.Atoi(string(m.Data))
  157. company, err := Account.Read_Company_ById(id)
  158. if err != nil {
  159. t_R.Code = 202
  160. t_R.Msg = err.Error()
  161. b, _ := msgpack.Marshal(&t_R)
  162. _ = lib.Nats.Publish(m.Reply, b)
  163. }
  164. t_R.Code = 200
  165. t_R.Msg = "ok"
  166. t_R.Data = company
  167. b, _ := msgpack.Marshal(&t_R)
  168. _ = lib.Nats.Publish(m.Reply, b)
  169. })
  170. // 请求-响应 获取公司 通过秘钥
  171. _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_key", "Cold_ReadCompanyByT_key", func(m *nats.Msg) {
  172. logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_key message: %s\n", string(m.Data)))
  173. type T_R struct {
  174. Code int16 `xml:"Code"`
  175. Msg string `xml:"Msg"`
  176. Data Account.Company `xml:"Data"` // 泛型
  177. }
  178. var t_R T_R
  179. company, err := Account.Read_Company_ByKey(string(m.Data))
  180. if err != nil {
  181. t_R.Code = 202
  182. t_R.Msg = err.Error()
  183. b, _ := msgpack.Marshal(&t_R)
  184. _ = lib.Nats.Publish(m.Reply, b)
  185. }
  186. t_R.Code = 200
  187. t_R.Msg = "ok"
  188. t_R.Data = company
  189. b, _ := msgpack.Marshal(&t_R)
  190. _ = lib.Nats.Publish(m.Reply, b)
  191. })
  192. // 请求-响应 获取所有用户列表
  193. _, _ = lib.Nats.QueueSubscribe("Cold_User_UserListAll", "Cold_User_UserListAll", func(m *nats.Msg) {
  194. type T_R struct {
  195. Code int16 `xml:"Code"`
  196. Msg string `xml:"Msg"`
  197. Data []Account.Admin_R `xml:"Data"` // 泛型
  198. }
  199. var t_R T_R
  200. t_R.Code = 200
  201. t_R.Msg = "ok"
  202. t_R.Data = Account.Read_User_List_All()
  203. b, _ := msgpack.Marshal(&t_R)
  204. _ = lib.Nats.Publish(m.Reply, b)
  205. })
  206. // 请求-响应 检查用户权限
  207. _, _ = lib.Nats.QueueSubscribe("Cold_User_CheckUserPermissions", "Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  208. logs.Debug(fmt.Sprintf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data)))
  209. type T_Req struct {
  210. Power_Id int `xml:"Power_Id"` // 权限id
  211. Req_Url string `xml:"Req_Url"` // 请求url
  212. }
  213. type T_R struct {
  214. Code int16 `xml:"Code"`
  215. Msg string `xml:"Msg"`
  216. Pass bool `xml:"Pass"` // 泛型
  217. }
  218. var t_Req T_Req
  219. var t_R T_R
  220. err := msgpack.Unmarshal(m.Data, &t_Req)
  221. if err != nil {
  222. t_R.Code = 202
  223. t_R.Msg = "Unmarshal error"
  224. b, _ := msgpack.Marshal(&t_R)
  225. _ = lib.Nats.Publish(m.Reply, b)
  226. return
  227. }
  228. power, err := Account.Read_Power_ById(t_Req.Power_Id)
  229. if err != nil {
  230. t_R.Code = 202
  231. t_R.Msg = err.Error()
  232. b, _ := msgpack.Marshal(&t_R)
  233. _ = lib.Nats.Publish(m.Reply, b)
  234. }
  235. flag := false
  236. if power.T_menu == "*" {
  237. flag = true
  238. }
  239. if !flag {
  240. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
  241. for _, v := range api {
  242. if strings.Contains(conf.Version+v.T_uri, t_Req.Req_Url) {
  243. flag = true
  244. break
  245. }
  246. }
  247. }
  248. t_R.Code = 200
  249. t_R.Msg = "ok"
  250. t_R.Pass = flag
  251. b, _ := msgpack.Marshal(&t_R)
  252. _ = lib.Nats.Publish(m.Reply, b)
  253. })
  254. // 发布-订阅 模式,异步订阅 AddSysLogs
  255. _, _ = lib.Nats.QueueSubscribe("Cold_User_AddSysLogs", "Cold_User_AddSysLogs", func(m *nats.Msg) {
  256. logs.Debug(fmt.Sprintf("AddSysLogs message: %s\n", string(m.Data)))
  257. type T_S struct {
  258. T_class string
  259. T_title string
  260. T_txt interface{}
  261. }
  262. var t_S T_S
  263. err := msgpack.Unmarshal(m.Data, &t_S)
  264. if err != nil {
  265. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  266. return
  267. }
  268. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  269. })
  270. // 发布-订阅 模式,异步订阅 AddUserLogs
  271. _, _ = lib.Nats.QueueSubscribe("Cold_User_AddUserLogs", "Cold_User_AddUserLogs", func(m *nats.Msg) {
  272. logs.Debug(fmt.Sprintf("AddUserLogs message: %s\n", string(m.Data)))
  273. type T_S struct {
  274. T_uuid string
  275. T_class string
  276. T_title string
  277. T_txt interface{}
  278. }
  279. var t_S T_S
  280. err := msgpack.Unmarshal(m.Data, &t_S)
  281. if err != nil {
  282. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  283. return
  284. }
  285. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  286. })
  287. // 请求-响应 获取设备
  288. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceByT_sn", "Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  289. logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
  290. type T_R struct {
  291. Code int16 `xml:"Code"`
  292. Msg string `xml:"Msg"`
  293. Data Device.Device `xml:"Data"` // 泛型
  294. }
  295. var t_R T_R
  296. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  297. if err != nil {
  298. t_R.Code = 202
  299. t_R.Msg = "查询失败"
  300. b, _ := msgpack.Marshal(&t_R)
  301. _ = lib.Nats.Publish(m.Reply, b)
  302. }
  303. t_R.Code = 200
  304. t_R.Msg = "ok"
  305. t_R.Data = device
  306. b, _ := msgpack.Marshal(&t_R)
  307. _ = lib.Nats.Publish(m.Reply, b)
  308. })
  309. // 请求-响应 获取传感器列表
  310. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceSensorALLByT_sn", "Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  311. logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
  312. type T_R struct {
  313. Code int16 `xml:"Code"`
  314. Msg string `xml:"Msg"`
  315. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  316. }
  317. var t_R T_R
  318. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  319. t_R.Code = 200
  320. t_R.Msg = "ok"
  321. t_R.Data = device
  322. b, _ := msgpack.Marshal(&t_R)
  323. _ = lib.Nats.Publish(m.Reply, b)
  324. })
  325. // 请求-响应 获取设备数据列表
  326. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataListBy_T_snid", "Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  327. logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data)))
  328. type T_Req struct {
  329. T_snid string `xml:"T_snid"`
  330. Time_start string `xml:"Time_start"`
  331. Time_end string `xml:"Time_end"`
  332. Page int `xml:"Page"`
  333. Page_z int `xml:"Page_z"`
  334. }
  335. type T_R struct {
  336. Code int16 `xml:"Code"`
  337. Msg string `xml:"Msg"`
  338. Count int64 `xml:"Count"`
  339. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  340. }
  341. var t_Req T_Req
  342. var t_R T_R
  343. err := msgpack.Unmarshal(m.Data, &t_Req)
  344. if err != nil {
  345. t_R.Code = 202
  346. t_R.Msg = "Unmarshal error"
  347. b, _ := msgpack.Marshal(&t_R)
  348. _ = lib.Nats.Publish(m.Reply, b)
  349. return
  350. }
  351. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  352. t_R.Code = 200
  353. t_R.Msg = "ok"
  354. t_R.Count = cnt
  355. t_R.Data = deviceData
  356. b, _ := msgpack.Marshal(&t_R)
  357. _ = lib.Nats.Publish(m.Reply, b)
  358. })
  359. // 请求-响应 通过key获取传感器列表
  360. _, _ = lib.Nats.QueueSubscribe("Cold_CompanyDeviceSensor_List_ByKey", "Cold_CompanyDeviceSensor_List_ByKey", func(m *nats.Msg) {
  361. logs.Debug(fmt.Sprintf("Cold_CompanyDeviceSensor_List_ByKey message: %s\n", string(m.Data)))
  362. type T_Req struct {
  363. T_sn string `xml:"T_sn"`
  364. T_key string `xml:"T_key"`
  365. }
  366. type T_R struct {
  367. Code int16 `xml:"Code"`
  368. Msg string `xml:"Msg"`
  369. Count int64 `xml:"Count"`
  370. Data []Device.DeviceSensor_R `xml:"Data"` // 泛型
  371. }
  372. var t_Req T_Req
  373. var t_R T_R
  374. err := msgpack.Unmarshal(m.Data, &t_Req)
  375. if err != nil {
  376. t_R.Code = 202
  377. t_R.Msg = "Unmarshal error"
  378. b, _ := msgpack.Marshal(&t_R)
  379. _ = lib.Nats.Publish(m.Reply, b)
  380. return
  381. }
  382. // 查询公司
  383. Company_r, err := Account.Read_Company_ByKey(t_Req.T_key)
  384. if err != nil {
  385. t_R.Code = 202
  386. t_R.Msg = "T_key error"
  387. b, _ := msgpack.Marshal(&t_R)
  388. _ = lib.Nats.Publish(m.Reply, b)
  389. return
  390. }
  391. // 查询公司下面所有子公司id
  392. T_pids := Account.ReadCompanyIds_T_path(Company_r.T_path)
  393. Account.Read_Company_All_Maps()
  394. deviceData, cnt := Device.Read_CompanyDeviceSensor_List_For_Data_ByKey(T_pids, t_Req.T_sn)
  395. t_R.Code = 200
  396. t_R.Msg = "ok"
  397. t_R.Count = cnt
  398. t_R.Data = deviceData
  399. b, _ := msgpack.Marshal(&t_R)
  400. _ = lib.Nats.Publish(m.Reply, b)
  401. })
  402. // 请求-响应 根据时间获取设备数据(单条)最新数据
  403. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataBy_T_snid_T_time", "Cold_ReadDeviceDataBy_T_snid_T_time", func(m *nats.Msg) {
  404. logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataBy_T_snid_T_time message: %s\n", string(m.Data)))
  405. type T_Req struct {
  406. T_sn string `xml:"T_sn"`
  407. T_id int `xml:"T_id"`
  408. Time string `xml:"Time"`
  409. }
  410. type T_R struct {
  411. Code int16 `xml:"Code"`
  412. Msg string `xml:"Msg"`
  413. Data Device.DeviceData_ `xml:"Data"` // 泛型
  414. }
  415. var t_Req T_Req
  416. var t_R T_R
  417. err := msgpack.Unmarshal(m.Data, &t_Req)
  418. if err != nil {
  419. t_R.Code = 202
  420. t_R.Msg = "Unmarshal error"
  421. b, _ := msgpack.Marshal(&t_R)
  422. _ = lib.Nats.Publish(m.Reply, b)
  423. return
  424. }
  425. deviceData := Device.Read_DeviceData_By_Time(t_Req.T_sn, t_Req.T_id, t_Req.Time)
  426. t_R.Code = 200
  427. t_R.Msg = "ok"
  428. t_R.Data = deviceData
  429. b, _ := msgpack.Marshal(&t_R)
  430. _ = lib.Nats.Publish(m.Reply, b)
  431. })
  432. // 请求-响应 获取设备
  433. _, _ = lib.Nats.QueueSubscribe("Cold_ReadDevice_List", "Cold_ReadDevice_List", func(m *nats.Msg) {
  434. logs.Debug(fmt.Sprintf("Cold_ReadDevice_List message: %s\n", string(m.Data)))
  435. type T_Req struct {
  436. Key string `xml:"Key"`
  437. Name string `xml:"Name"`
  438. Page int `xml:"Page"`
  439. Page_z int `xml:"Page_z"`
  440. }
  441. var t_Req T_Req
  442. type T_R struct {
  443. Code int16 `xml:"Code"`
  444. Msg string `xml:"Msg"`
  445. Count int64 `xml:"Count"`
  446. Data []Device.Device_R `xml:"Data"` // 泛型
  447. }
  448. var t_R T_R
  449. err := msgpack.Unmarshal(m.Data, &t_Req)
  450. if err != nil {
  451. t_R.Code = 202
  452. t_R.Msg = "Unmarshal error"
  453. b, _ := msgpack.Marshal(&t_R)
  454. _ = lib.Nats.Publish(m.Reply, b)
  455. return
  456. }
  457. // 查询公司
  458. Company_r, err := Account.Read_Company_ByKey(t_Req.Key)
  459. if err != nil {
  460. t_R.Code = 202
  461. t_R.Msg = "T_key error"
  462. b, _ := msgpack.Marshal(&t_R)
  463. _ = lib.Nats.Publish(m.Reply, b)
  464. return
  465. }
  466. deviceList, count := Device.Read_Device_List(&Account.Admin{T_pid: Company_r.Id}, []string{}, Company_r.Id, t_Req.Name, "", "", 0, t_Req.Page, t_Req.Page_z)
  467. t_R.Code = 200
  468. t_R.Msg = "ok"
  469. t_R.Count = count
  470. t_R.Data = deviceList
  471. b, _ := msgpack.Marshal(&t_R)
  472. _ = lib.Nats.Publish(m.Reply, b)
  473. })
  474. // 发布-订阅 修改设备校准时间
  475. _, _ = lib.Nats.QueueSubscribe("Cold_UpdateDevice_CalibrationTime", "Cold_UpdateDevice_CalibrationTime", func(m *nats.Msg) {
  476. logs.Debug(fmt.Sprintf("Cold_UpdateDevice_CalibrationTime message: %s\n", string(m.Data)))
  477. type T_Req struct {
  478. T_sn string
  479. T_CalibrationTime string
  480. }
  481. type T_R struct {
  482. Code int16 `xml:"Code"`
  483. Msg string `xml:"Msg"`
  484. Data interface{} `xml:"Data"` // 泛型
  485. }
  486. var t_R T_R
  487. var t_req T_Req
  488. err := msgpack.Unmarshal(m.Data, &t_req)
  489. if err != nil {
  490. t_R.Code = 202
  491. t_R.Msg = "Unmarshal error"
  492. b, _ := msgpack.Marshal(&t_R)
  493. _ = lib.Nats.Publish(m.Reply, b)
  494. return
  495. }
  496. device, err := Device.Read_Device_ByT_sn(t_req.T_sn)
  497. if err != nil {
  498. if err.Error() == "record not found" {
  499. t_R.Code = 202
  500. t_R.Msg = "SN不存在"
  501. b, _ := msgpack.Marshal(&t_R)
  502. _ = lib.Nats.Publish(m.Reply, b)
  503. return
  504. }
  505. t_R.Code = 202
  506. t_R.Msg = "查询失败"
  507. b, _ := msgpack.Marshal(&t_R)
  508. _ = lib.Nats.Publish(m.Reply, b)
  509. return
  510. }
  511. // 校准时间
  512. CalibrationTime, CalibrationTime_is := lib.DateStrToDate(t_req.T_CalibrationTime)
  513. if CalibrationTime_is {
  514. device.T_CalibrationTime = CalibrationTime
  515. }
  516. if is := Device.Update_Device(device, "T_CalibrationTime"); !is {
  517. t_R.Code = 202
  518. t_R.Msg = "修改校准时间失败"
  519. b, _ := msgpack.Marshal(&t_R)
  520. _ = lib.Nats.Publish(m.Reply, b)
  521. return
  522. }
  523. t_R.Code = 200
  524. t_R.Msg = "ok"
  525. b, _ := msgpack.Marshal(&t_R)
  526. _ = lib.Nats.Publish(m.Reply, b)
  527. return
  528. })
  529. //通过sn获取设备最新开始监控时间以及结束时间
  530. _, _ = lib.Nats.QueueSubscribe("Read_DeviceTask_List_By_Condition", "Read_DeviceTask_List_By_Condition", func(m *nats.Msg) {
  531. logs.Debug(fmt.Sprintf("Read_DeviceTask_List_By_Condition message: %s\n", string(m.Data)))
  532. type T_R struct {
  533. Code int16 `xml:"Code"`
  534. Msg string `xml:"Msg"`
  535. Data Device.DeviceTask `xml:"Data"`
  536. }
  537. type T_Req struct {
  538. T_sn string `xml:"T_sn"`
  539. }
  540. var t_R T_R
  541. var t_Req T_Req
  542. err := msgpack.Unmarshal(m.Data, &t_Req)
  543. if err != nil {
  544. t_R.Code = 202
  545. t_R.Msg = "Unmarshal error"
  546. b, _ := msgpack.Marshal(&t_R)
  547. _ = lib.Nats.Publish(m.Reply, b)
  548. return
  549. }
  550. l, err := Device.Read_DeviceTask_List_By_Condition(t_Req.T_sn)
  551. if err == nil && l.Id != 0 {
  552. t_R.Code = 200
  553. t_R.Msg = "ok"
  554. t_R.Data = l
  555. b, _ := msgpack.Marshal(&t_R)
  556. _ = lib.Nats.Publish(m.Reply, b)
  557. return
  558. }
  559. t_R.Code = 202
  560. t_R.Msg = "查询失败当前数据为空"
  561. b, _ := msgpack.Marshal(&t_R)
  562. _ = lib.Nats.Publish(m.Reply, b)
  563. return
  564. })
  565. // 发布-订阅 通过sn_id 查询最新设备数据
  566. _, _ = lib.Nats.QueueSubscribe("Read_New_DeviceData", "Read_New_DeviceData", func(m *nats.Msg) {
  567. logs.Debug(fmt.Sprintf("Read_New_DeviceData message: %s\n", string(m.Data)))
  568. type T_R struct {
  569. Code int16 `xml:"Code"`
  570. Msg string `xml:"Msg"`
  571. Data Device.DeviceData_ `xml:"Data"`
  572. }
  573. type T_Req struct {
  574. T_sn string `xml:"T_sn"`
  575. T_id int `xml:"T_id"`
  576. }
  577. var t_R T_R
  578. var t_Req T_Req
  579. err := msgpack.Unmarshal(m.Data, &t_Req)
  580. if err != nil {
  581. t_R.Code = 202
  582. t_R.Msg = "Unmarshal error"
  583. b, _ := msgpack.Marshal(&t_R)
  584. _ = lib.Nats.Publish(m.Reply, b)
  585. return
  586. }
  587. r := Device.Read_DeviceData(t_Req.T_sn, t_Req.T_id)
  588. if r.T_id != 0 {
  589. t_R.Code = 200
  590. t_R.Msg = "ok"
  591. t_R.Data = r
  592. b, _ := msgpack.Marshal(&t_R)
  593. _ = lib.Nats.Publish(m.Reply, b)
  594. return
  595. }
  596. t_R.Data = r
  597. t_R.Code = 202
  598. t_R.Msg = "查询失败当前数据为空"
  599. b, _ := msgpack.Marshal(&t_R)
  600. _ = lib.Nats.Publish(m.Reply, b)
  601. return
  602. })
  603. // 查询大于开始时间的所有数据
  604. _, _ = lib.Nats.QueueSubscribe("Read_Start_Time_DeviceData", "Read_Start_Time_DeviceData", func(m *nats.Msg) {
  605. logs.Debug(fmt.Sprintf("Read_Start_Time_DeviceData message: %s\n", string(m.Data)))
  606. type T_R struct {
  607. Code int16 `xml:"Code"`
  608. Msg string `xml:"Msg"`
  609. Data []Device.DeviceData_ `xml:"Data"`
  610. }
  611. type T_Req struct {
  612. T_sn string `xml:"T_sn"`
  613. T_id int `xml:"T_id"`
  614. StartTime string `xml:"StartTime"`
  615. EndTime string `xml:"EndTime"`
  616. }
  617. var t_R T_R
  618. var t_Req T_Req
  619. err := msgpack.Unmarshal(m.Data, &t_Req)
  620. if err != nil {
  621. t_R.Code = 202
  622. t_R.Msg = "Unmarshal error"
  623. b, _ := msgpack.Marshal(&t_R)
  624. _ = lib.Nats.Publish(m.Reply, b)
  625. return
  626. }
  627. t, count := Device.Read_DeviceData_Start_Time(t_Req.T_sn, t_Req.T_id, t_Req.StartTime, t_Req.EndTime)
  628. if count > 0 {
  629. t_R.Code = 200
  630. t_R.Msg = "ok"
  631. t_R.Data = t
  632. b, _ := msgpack.Marshal(&t_R)
  633. _ = lib.Nats.Publish(m.Reply, b)
  634. return
  635. }
  636. t_R.Code = 202
  637. t_R.Msg = "查询失败当前数据为空"
  638. b, _ := msgpack.Marshal(&t_R)
  639. _ = lib.Nats.Publish(m.Reply, b)
  640. return
  641. })
  642. // 根据sn查询设备信息
  643. _, _ = lib.Nats.QueueSubscribe("Read_Device_ByT_sn", "Read_Device_ByT_sn", func(m *nats.Msg) {
  644. logs.Debug(fmt.Sprintf("Read_Device_ByT_sn message: %s\n", string(m.Data)))
  645. type T_R struct {
  646. Code int16 `xml:"Code"`
  647. Msg string `xml:"Msg"`
  648. Data Device.Device `xml:"Data"`
  649. }
  650. type T_Req struct {
  651. T_sn string `xml:"T_sn"`
  652. }
  653. var t_R T_R
  654. var t_Req T_Req
  655. err := msgpack.Unmarshal(m.Data, &t_Req)
  656. if err != nil {
  657. t_R.Code = 202
  658. t_R.Msg = "Unmarshal error"
  659. b, _ := msgpack.Marshal(&t_R)
  660. _ = lib.Nats.Publish(m.Reply, b)
  661. return
  662. }
  663. t, err := Device.Read_Device_ByT_sn(t_Req.T_sn)
  664. if err == nil {
  665. t_R.Code = 200
  666. t_R.Msg = "ok"
  667. t_R.Data = t
  668. b, _ := msgpack.Marshal(&t_R)
  669. _ = lib.Nats.Publish(m.Reply, b)
  670. return
  671. }
  672. t_R.Code = 202
  673. t_R.Msg = "查询失败当前数据为空"
  674. b, _ := msgpack.Marshal(&t_R)
  675. _ = lib.Nats.Publish(m.Reply, b)
  676. return
  677. })
  678. }