123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- package Device
- import (
- "Cold_mqtt/conf"
- "Cold_mqtt/lib"
- "Cold_mqtt/logs"
- "encoding/json"
- "fmt"
- "github.com/astaxie/beego/cache"
- _ "github.com/astaxie/beego/cache/redis"
- "github.com/beego/beego/v2/adapter/orm"
- _ "github.com/go-sql-driver/mysql"
- "strconv"
- "strings"
- "time"
- )
- // 模板
- type DeviceData_R struct {
- T_t float32 // 温度
- T_rh float32 // 湿度
- T_Site string // GPS
- T_time time.Time // 采集时间
- T_sp int // 传感器参数ID
- //create_time
- }
- type DeviceData_old struct {
- T_t float32
- T_rh float32
- T_site string
- T_sp int
- }
- var redis_DeviceData cache.Cache
- func init() {
- config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
- "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
- logs.Println(config)
- var err error
- redis_DeviceData, err = cache.NewCache("redis", config)
- if err != nil || redis_DeviceData == nil {
- errMsg := "failed to init redis"
- logs.Println(errMsg, err)
- }
- }
- // ---------------- Redis -------------------
- // Redis_Set(m.T_sn,m) // Redis 更新缓存
- func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
- key := T_sn + "|" + strconv.Itoa(T_id)
- if redis_DeviceData.IsExist(key) {
- var t DeviceData_R
- v := redis_DeviceData.Get(key)
- json.Unmarshal(v.([]byte), &t)
- // 防止时间溢出
- if time.Now().Unix() <= r.T_time.Unix() {
- r.T_time = time.Now()
- }
- // 时间在当前时间的正负10分钟范围内
- if !lib.IsWithinMinutesRange(r.T_time, 600) {
- // 时间有问题,或者差距太大
- return
- }
- // 提前最新数据
- //if t.T_time.Unix() > r.T_time.Unix() {
- // if lib.IsWithinMinutesRange( t.T_time,600){
- // // 储存的 是最新数据
- // return
- // }
- //}
- }
- //json序列化
- str, err := json.Marshal(r)
- if err != nil {
- logs.PrintlnError("RedisDeviceData_Set", err)
- return
- }
- err = redis_DeviceData.Put(key, str, 1*time.Hour)
- if err != nil {
- logs.Println("set key:", key, ",value:", str, err)
- }
- return
- }
- func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
- key := T_sn + "|" + strconv.Itoa(T_id)
- if redis_DeviceData.IsExist(key) {
- v := redis_DeviceData.Get(key)
- json.Unmarshal(v.([]byte), &r)
- return r, true
- }
- return DeviceData_R{}, false
- }
- // 添加 数据 返回 真:替换 假:第一条无替换
- func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) {
- o := orm.NewOrm()
- // 开始插入数据
- sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
- "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
- // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
- logs.Println(sql)
- _, err := o.Raw(sql).Exec()
- if err != nil {
- //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist
- //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY'
- if strings.Contains(err.Error(), "Error 1062") {
- var DeviceData_r DeviceData_old
- err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r)
- if err != nil {
- logs.PrintlnError("SELECT_DeviceData:" + err.Error())
- return false, DeviceData_old{}
- }
- // 重复添加
- sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))"
- // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度
- logs.Println(sql)
- _, err = o.Raw(sql).Exec()
- if err != nil {
- logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error())
- }
- logs.PrintlnData(sql)
- return true, DeviceData_r
- } else {
- logs.PrintlnError("Add_DeviceData:" + sql + err.Error())
- return false, DeviceData_old{}
- }
- } else {
- logs.PrintlnData(sql)
- }
- //res.RowsAffected()
- // 添加缓存 sn id 最新数据
- RedisDeviceData_Set(T_sn, T_id, v)
- RedisDeviceData_Set(T_sn+"_Node", T_id, v)
- return false, DeviceData_old{}
- }
- func Ttt() {
- time.Sleep(time.Second)
- t1 := "2019-01-08 13:50:30" //外部传入的时间字符串
- timeTemplate1 := "2006-01-02 15:04:05" //常规类型
- stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
- //// 更新记录 - 缓存
- DeviceData_t := DeviceData_R{
- T_t: 88,
- T_rh: 88,
- T_Site: "88",
- T_time: stamp,
- }
- if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t); r_ {
- // 被替换
- Add_DeviceDataOld(DeviceDataOld{
- T_sn: "ffffff",
- T_id: 111,
- T_t: DeviceData_old_r.T_t,
- T_rh: DeviceData_old_r.T_rh,
- T_Site: DeviceData_old_r.T_site,
- T_time: DeviceData_t.T_time,
- T_operation: 2,
- T_uuid: "",
- })
- }
- }
|