DeviceData.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package Device
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/lib"
  5. "Cold_mqtt/logs"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/astaxie/beego/cache"
  9. _ "github.com/astaxie/beego/cache/redis"
  10. "github.com/beego/beego/v2/adapter/orm"
  11. _ "github.com/go-sql-driver/mysql"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // 模板
  17. type DeviceData_R struct {
  18. T_t float32 // 温度
  19. T_rh float32 // 湿度
  20. T_Site string // GPS
  21. T_time time.Time // 采集时间
  22. T_sp int // 传感器参数ID
  23. //create_time
  24. }
  25. type DeviceData_old struct {
  26. T_t float32
  27. T_rh float32
  28. T_site string
  29. T_sp int
  30. }
  31. var redis_DeviceData cache.Cache
  32. func init() {
  33. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  34. "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  35. logs.Println(config)
  36. var err error
  37. redis_DeviceData, err = cache.NewCache("redis", config)
  38. if err != nil || redis_DeviceData == nil {
  39. errMsg := "failed to init redis"
  40. logs.Println(errMsg, err)
  41. }
  42. }
  43. // ---------------- Redis -------------------
  44. // Redis_Set(m.T_sn,m) // Redis 更新缓存
  45. func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
  46. key := T_sn + "|" + strconv.Itoa(T_id)
  47. if redis_DeviceData.IsExist(key) {
  48. var t DeviceData_R
  49. v := redis_DeviceData.Get(key)
  50. json.Unmarshal(v.([]byte), &t)
  51. // 防止时间溢出
  52. if time.Now().Unix() <= r.T_time.Unix() {
  53. r.T_time = time.Now()
  54. }
  55. // 时间在当前时间的正负10分钟范围内
  56. if !lib.IsWithinMinutesRange(r.T_time, 600) {
  57. // 时间有问题,或者差距太大
  58. return
  59. }
  60. // 提前最新数据
  61. //if t.T_time.Unix() > r.T_time.Unix() {
  62. // if lib.IsWithinMinutesRange( t.T_time,600){
  63. // // 储存的 是最新数据
  64. // return
  65. // }
  66. //}
  67. }
  68. //json序列化
  69. str, err := json.Marshal(r)
  70. if err != nil {
  71. logs.PrintlnError("RedisDeviceData_Set", err)
  72. return
  73. }
  74. err = redis_DeviceData.Put(key, str, 1*time.Hour)
  75. if err != nil {
  76. logs.Println("set key:", key, ",value:", str, err)
  77. }
  78. return
  79. }
  80. func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
  81. key := T_sn + "|" + strconv.Itoa(T_id)
  82. if redis_DeviceData.IsExist(key) {
  83. v := redis_DeviceData.Get(key)
  84. json.Unmarshal(v.([]byte), &r)
  85. return r, true
  86. }
  87. return DeviceData_R{}, false
  88. }
  89. // 添加 数据 返回 真:替换 假:第一条无替换
  90. func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) {
  91. o := orm.NewOrm()
  92. // 开始插入数据
  93. sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
  94. "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
  95. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  96. logs.Println(sql)
  97. _, err := o.Raw(sql).Exec()
  98. if err != nil {
  99. //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist
  100. //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY'
  101. if strings.Contains(err.Error(), "Error 1062") {
  102. var DeviceData_r DeviceData_old
  103. err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r)
  104. if err != nil {
  105. logs.PrintlnError("SELECT_DeviceData:" + err.Error())
  106. return false, DeviceData_old{}
  107. }
  108. // 重复添加
  109. sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))"
  110. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  111. logs.Println(sql)
  112. _, err = o.Raw(sql).Exec()
  113. if err != nil {
  114. logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error())
  115. }
  116. logs.PrintlnData(sql)
  117. return true, DeviceData_r
  118. } else {
  119. logs.PrintlnError("Add_DeviceData:" + sql + err.Error())
  120. return false, DeviceData_old{}
  121. }
  122. } else {
  123. logs.PrintlnData(sql)
  124. }
  125. //res.RowsAffected()
  126. // 添加缓存 sn id 最新数据
  127. RedisDeviceData_Set(T_sn, T_id, v)
  128. RedisDeviceData_Set(T_sn+"_Node", T_id, v)
  129. return false, DeviceData_old{}
  130. }
  131. func Ttt() {
  132. time.Sleep(time.Second)
  133. t1 := "2019-01-08 13:50:30" //外部传入的时间字符串
  134. timeTemplate1 := "2006-01-02 15:04:05" //常规类型
  135. stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  136. //// 更新记录 - 缓存
  137. DeviceData_t := DeviceData_R{
  138. T_t: 88,
  139. T_rh: 88,
  140. T_Site: "88",
  141. T_time: stamp,
  142. }
  143. if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t); r_ {
  144. // 被替换
  145. Add_DeviceDataOld(DeviceDataOld{
  146. T_sn: "ffffff",
  147. T_id: 111,
  148. T_t: DeviceData_old_r.T_t,
  149. T_rh: DeviceData_old_r.T_rh,
  150. T_Site: DeviceData_old_r.T_site,
  151. T_time: DeviceData_t.T_time,
  152. T_operation: 2,
  153. T_uuid: "",
  154. })
  155. }
  156. }