DeviceData.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package Device
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/lib"
  5. "Cold_mqtt/logs"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/astaxie/beego/cache"
  9. _ "github.com/astaxie/beego/cache/redis"
  10. "github.com/beego/beego/v2/adapter/orm"
  11. _ "github.com/go-sql-driver/mysql"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. // 模板
  17. type DeviceData_R struct {
  18. T_t float32 // 温度
  19. T_rh float32 // 湿度
  20. T_Site string // GPS
  21. T_time time.Time // 采集时间
  22. T_sp int // 传感器参数ID
  23. //create_time
  24. }
  25. type DeviceData_old struct {
  26. T_t float32
  27. T_rh float32
  28. T_site string
  29. T_sp int
  30. }
  31. var redis_DeviceData cache.Cache
  32. func init() {
  33. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  34. "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  35. logs.Println(config)
  36. var err error
  37. redis_DeviceData, err = cache.NewCache("redis", config)
  38. if err != nil || redis_DeviceData == nil {
  39. errMsg := "failed to init redis"
  40. logs.Println(errMsg, err)
  41. }
  42. }
  43. // ---------------- Redis -------------------
  44. // Redis_Set(m.T_sn,m) // Redis 更新缓存
  45. func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
  46. key := T_sn + "|" + strconv.Itoa(T_id)
  47. if redis_DeviceData.IsExist(key) {
  48. var t DeviceData_R
  49. v := redis_DeviceData.Get(key)
  50. json.Unmarshal(v.([]byte), &t)
  51. // 防止时间溢出
  52. if time.Now().Unix() <= r.T_time.Unix() {
  53. r.T_time = time.Now()
  54. }
  55. // 时间在当前时间的正负10分钟范围内
  56. if !lib.IsWithinMinutesRange(r.T_time, 600) {
  57. // 时间有问题,或者差距太大
  58. return
  59. }
  60. // 提前最新数据
  61. //if t.T_time.Unix() > r.T_time.Unix() {
  62. // if lib.IsWithinMinutesRange( t.T_time,600){
  63. // // 储存的 是最新数据
  64. // return
  65. // }
  66. //}
  67. }
  68. //json序列化
  69. str, err := json.Marshal(r)
  70. if err != nil {
  71. logs.PrintlnError("RedisDeviceData_Set", err)
  72. return
  73. }
  74. err = redis_DeviceData.Put(key, str, 24*time.Hour)
  75. if err != nil {
  76. logs.Println("set key:", key, ",value:", str, err)
  77. }
  78. return
  79. }
  80. func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
  81. key := T_sn + "|" + strconv.Itoa(T_id)
  82. if redis_DeviceData.IsExist(key) {
  83. v := redis_DeviceData.Get(key)
  84. json.Unmarshal(v.([]byte), &r)
  85. return r, true
  86. }
  87. return DeviceData_R{}, false
  88. }
  89. func RedisDeviceData_Del(T_sn string, T_id int) {
  90. key := T_sn + "|" + strconv.Itoa(T_id)
  91. if redis_DeviceData.IsExist(key) {
  92. redis_DeviceData.Delete(key)
  93. logs.Println("RedisDeviceData_Del 清除标志位:", key)
  94. }
  95. }
  96. // 添加 数据 返回 真:替换 假:第一条无替换 ignore 重复是否忽略
  97. func Add_DeviceData(T_sn string, T_id int, v DeviceData_R, ignore bool) (bool, DeviceData_old) {
  98. o := orm.NewOrm()
  99. // 开始插入数据
  100. sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
  101. "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
  102. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  103. logs.Println(sql)
  104. _, err := o.Raw(sql).Exec()
  105. if err != nil {
  106. //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist
  107. //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY'
  108. if strings.Contains(err.Error(), "Error 1062") {
  109. if ignore { // 重复是否忽略
  110. logs.Println("重复忽略数据:", T_sn, T_id, v.T_time.Format("2006-01-02 15:04:05"))
  111. return true, DeviceData_old{}
  112. }
  113. var DeviceData_r DeviceData_old
  114. err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r)
  115. if err != nil {
  116. logs.PrintlnError("SELECT_DeviceData:" + err.Error())
  117. return false, DeviceData_old{}
  118. }
  119. // 重复添加
  120. sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))"
  121. // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
  122. logs.Println(sql)
  123. _, err = o.Raw(sql).Exec()
  124. if err != nil {
  125. logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error())
  126. }
  127. logs.PrintlnData(sql)
  128. return true, DeviceData_r
  129. } else {
  130. logs.PrintlnError("Add_DeviceData:" + sql + err.Error())
  131. return false, DeviceData_old{}
  132. }
  133. } else {
  134. logs.PrintlnData(sql)
  135. }
  136. //res.RowsAffected()
  137. // 添加缓存 sn id 最新数据
  138. RedisDeviceData_Set(T_sn, T_id, v)
  139. //RedisDeviceData_Set(T_sn+"_Node", T_id, v)
  140. return false, DeviceData_old{}
  141. }
  142. func Ttt() {
  143. time.Sleep(time.Second)
  144. t1 := "2019-01-08 13:50:30" //外部传入的时间字符串
  145. timeTemplate1 := "2006-01-02 15:04:05" //常规类型
  146. stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  147. //// 更新记录 - 缓存
  148. DeviceData_t := DeviceData_R{
  149. T_t: 88,
  150. T_rh: 88,
  151. T_Site: "88",
  152. T_time: stamp,
  153. }
  154. if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t, true); r_ {
  155. // 被替换
  156. Add_DeviceDataOld(DeviceDataOld{
  157. T_sn: "ffffff",
  158. T_id: 111,
  159. T_t: DeviceData_old_r.T_t,
  160. T_rh: DeviceData_old_r.T_rh,
  161. T_Site: DeviceData_old_r.T_site,
  162. T_time: DeviceData_t.T_time,
  163. T_operation: 2,
  164. T_uuid: "",
  165. })
  166. }
  167. }