DeviceData.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package Device
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/lib"
  5. "Cold_mqtt/logs"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/astaxie/beego/cache"
  9. _ "github.com/astaxie/beego/cache/redis"
  10. "github.com/beego/beego/v2/adapter/orm"
  11. _ "github.com/go-sql-driver/mysql"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // 模板
  17. type DeviceData_R struct {
  18. T_t float32 // 温度
  19. T_rh float32 // 湿度
  20. T_Site string // GPS
  21. T_time time.Time // 采集时间
  22. T_sp int // 传感器参数ID
  23. //create_time
  24. }
  25. type DeviceData_old struct {
  26. T_t float32
  27. T_rh float32
  28. T_site string
  29. T_sp int
  30. }
  31. var redis_DeviceData cache.Cache
  32. func init() {
  33. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  34. "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  35. logs.Println(config)
  36. var err error
  37. redis_DeviceData, err = cache.NewCache("redis", config)
  38. if err != nil || redis_DeviceData == nil {
  39. errMsg := "failed to init redis"
  40. logs.Println(errMsg, err)
  41. }
  42. }
  43. // ---------------- Redis -------------------
  44. // Redis_Set(m.T_sn,m) // Redis 更新缓存
  45. func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
  46. key := T_sn + "|" + strconv.Itoa(T_id)
  47. if redis_DeviceData.IsExist(key) {
  48. var t DeviceData_R
  49. v := redis_DeviceData.Get(key)
  50. json.Unmarshal(v.([]byte), &t)
  51. // 防止时间溢出
  52. if time.Now().Unix() <= r.T_time.Unix() {
  53. r.T_time = time.Now()
  54. }
  55. // 提前最新数据
  56. if t.T_time.Unix() > r.T_time.Unix() {
  57. // 储存的 是最新数据
  58. return
  59. }
  60. }
  61. //json序列化
  62. str, err := json.Marshal(r)
  63. if err != nil {
  64. logs.PrintlnError("RedisDeviceData_Set", err)
  65. return
  66. }
  67. err = redis_DeviceData.Put(key, str, 1*time.Hour)
  68. if err != nil {
  69. logs.Println("set key:", key, ",value:", str, err)
  70. }
  71. return
  72. }
  73. func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
  74. key := T_sn + "|" + strconv.Itoa(T_id)
  75. if redis_DeviceData.IsExist(key) {
  76. v := redis_DeviceData.Get(key)
  77. json.Unmarshal(v.([]byte), &r)
  78. return r, true
  79. }
  80. return DeviceData_R{}, false
  81. }
  82. // 添加 数据 返回 真:替换 假:第一条无替换
  83. func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) {
  84. o := orm.NewOrm()
  85. // 开始插入数据
  86. sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
  87. "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
  88. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  89. logs.Println(sql)
  90. _, err := o.Raw(sql).Exec()
  91. if err != nil {
  92. //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist
  93. //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY'
  94. if strings.Contains(err.Error(), "Error 1062") {
  95. var DeviceData_r DeviceData_old
  96. err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r)
  97. if err != nil {
  98. logs.PrintlnError("SELECT_DeviceData:" + err.Error())
  99. return false, DeviceData_old{}
  100. }
  101. // 重复添加
  102. sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))"
  103. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  104. logs.Println(sql)
  105. _, err = o.Raw(sql).Exec()
  106. if err != nil {
  107. logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error())
  108. }
  109. logs.PrintlnData(sql)
  110. return true, DeviceData_r
  111. } else {
  112. logs.PrintlnError("Add_DeviceData:" + sql + err.Error())
  113. return false, DeviceData_old{}
  114. }
  115. } else {
  116. logs.PrintlnData(sql)
  117. }
  118. //res.RowsAffected()
  119. // 添加缓存 sn id 最新数据
  120. RedisDeviceData_Set(T_sn, T_id, v)
  121. return false, DeviceData_old{}
  122. }
  123. func Ttt() {
  124. time.Sleep(time.Second)
  125. t1 := "2019-01-08 13:50:30" //外部传入的时间字符串
  126. timeTemplate1 := "2006-01-02 15:04:05" //常规类型
  127. stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  128. //// 更新记录 - 缓存
  129. DeviceData_t := DeviceData_R{
  130. T_t: 88,
  131. T_rh: 88,
  132. T_Site: "88",
  133. T_time: stamp,
  134. }
  135. if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t); r_ {
  136. // 被替换
  137. Add_DeviceDataOld(DeviceDataOld{
  138. T_sn: "ffffff",
  139. T_id: 111,
  140. T_t: DeviceData_old_r.T_t,
  141. T_rh: DeviceData_old_r.T_rh,
  142. T_Site: DeviceData_old_r.T_site,
  143. T_time: DeviceData_t.T_time,
  144. T_operation: 2,
  145. T_uuid: "",
  146. })
  147. }
  148. }