DeviceData.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package Device
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/lib"
  5. "Cold_mqtt/logs"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/astaxie/beego/cache"
  9. _ "github.com/astaxie/beego/cache/redis"
  10. "github.com/beego/beego/v2/adapter/orm"
  11. _ "github.com/go-sql-driver/mysql"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // 模板
  17. type DeviceData_R struct {
  18. T_t float32 // 温度
  19. T_rh float32 // 湿度
  20. T_Site string // GPS
  21. T_time time.Time // 采集时间
  22. T_sp int // 传感器参数ID
  23. //create_time
  24. }
  25. type DeviceData_old struct {
  26. T_t float32
  27. T_rh float32
  28. T_site string
  29. T_sp int
  30. }
  31. var redis_DeviceData cache.Cache
  32. func init() {
  33. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  34. "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  35. logs.Println(config)
  36. var err error
  37. redis_DeviceData, err = cache.NewCache("redis", config)
  38. if err != nil || redis_DeviceData == nil {
  39. errMsg := "failed to init redis"
  40. logs.Println(errMsg, err)
  41. }
  42. }
  43. // ---------------- Redis -------------------
  44. // Redis_Set(m.T_sn,m) // Redis 更新缓存
  45. func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
  46. key := T_sn + "|" + strconv.Itoa(T_id)
  47. if redis_DeviceData.IsExist(key) {
  48. var t DeviceData_R
  49. v := redis_DeviceData.Get(key)
  50. json.Unmarshal(v.([]byte), &t)
  51. // 防止时间溢出
  52. if time.Now().Unix() <= r.T_time.Unix() {
  53. r.T_time = time.Now()
  54. }
  55. // 提前最新数据
  56. if t.T_time.Unix() > r.T_time.Unix() {
  57. // 储存的 是最新数据
  58. return
  59. }
  60. }
  61. //json序列化
  62. str, err := json.Marshal(r)
  63. if err != nil {
  64. logs.PrintlnError("RedisDeviceData_Set", err)
  65. return
  66. }
  67. err = redis_DeviceData.Put(key, str, 1*time.Hour)
  68. if err != nil {
  69. logs.Println("set key:", key, ",value:", str, err)
  70. }
  71. return
  72. }
  73. func RedisDeviceData_Get(key string) (r DeviceData_R, is bool) {
  74. if redis_DeviceData.IsExist(key) {
  75. v := redis_DeviceData.Get(key)
  76. json.Unmarshal(v.([]byte), &r)
  77. return r, true
  78. }
  79. return DeviceData_R{}, false
  80. }
  81. // 添加 数据 返回 真:替换 假:第一条无替换
  82. func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) {
  83. o := orm.NewOrm()
  84. // 开始插入数据
  85. sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
  86. "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
  87. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  88. logs.Println(sql)
  89. _, err := o.Raw(sql).Exec()
  90. if err != nil {
  91. //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist
  92. //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY'
  93. if strings.Contains(err.Error(), "Error 1062") {
  94. var DeviceData_r DeviceData_old
  95. err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r)
  96. if err != nil {
  97. logs.PrintlnError("SELECT_DeviceData:" + err.Error())
  98. return false, DeviceData_old{}
  99. }
  100. // 重复添加
  101. sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))"
  102. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  103. logs.Println(sql)
  104. _, err = o.Raw(sql).Exec()
  105. if err != nil {
  106. logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error())
  107. }
  108. logs.PrintlnData(sql)
  109. return true, DeviceData_r
  110. } else {
  111. logs.PrintlnError("Add_DeviceData:" + sql + err.Error())
  112. return false, DeviceData_old{}
  113. }
  114. } else {
  115. logs.PrintlnData(sql)
  116. }
  117. //res.RowsAffected()
  118. // 添加缓存 sn id 最新数据
  119. RedisDeviceData_Set(T_sn, T_id, v)
  120. return false, DeviceData_old{}
  121. }
  122. func Ttt() {
  123. time.Sleep(time.Second)
  124. t1 := "2019-01-08 13:50:30" //外部传入的时间字符串
  125. timeTemplate1 := "2006-01-02 15:04:05" //常规类型
  126. stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  127. //// 更新记录 - 缓存
  128. DeviceData_t := DeviceData_R{
  129. T_t: 88,
  130. T_rh: 88,
  131. T_Site: "88",
  132. T_time: stamp,
  133. }
  134. if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t); r_ {
  135. // 被替换
  136. Add_DeviceDataOld(DeviceDataOld{
  137. T_sn: "ffffff",
  138. T_id: 111,
  139. T_t: DeviceData_old_r.T_t,
  140. T_rh: DeviceData_old_r.T_rh,
  141. T_Site: DeviceData_old_r.T_site,
  142. T_time: DeviceData_t.T_time,
  143. T_operation: 2,
  144. T_uuid: "",
  145. })
  146. }
  147. }