package Device import ( "Cold_mqtt/conf" "Cold_mqtt/lib" "Cold_mqtt/logs" "encoding/json" "fmt" "github.com/astaxie/beego/cache" _ "github.com/astaxie/beego/cache/redis" "github.com/beego/beego/v2/adapter/orm" _ "github.com/go-sql-driver/mysql" "strconv" "strings" "time" ) // 模板 type DeviceData_R struct { T_t float32 // 温度 T_rh float32 // 湿度 T_Site string // GPS T_time time.Time // 采集时间 T_sp int // 传感器参数ID //create_time } type DeviceData_old struct { T_t float32 T_rh float32 T_site string T_sp int } var redis_DeviceData cache.Cache func init() { config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`, "redis_DeviceData", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password) logs.Println(config) var err error redis_DeviceData, err = cache.NewCache("redis", config) if err != nil || redis_DeviceData == nil { errMsg := "failed to init redis" logs.Println(errMsg, err) } } // ---------------- Redis ------------------- // Redis_Set(m.T_sn,m) // Redis 更新缓存 func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) { key := T_sn + "|" + strconv.Itoa(T_id) if redis_DeviceData.IsExist(key) { var t DeviceData_R v := redis_DeviceData.Get(key) json.Unmarshal(v.([]byte), &t) // 防止时间溢出 if time.Now().Unix() <= r.T_time.Unix() { r.T_time = time.Now() } // 时间在当前时间的正负10分钟范围内 if !lib.IsWithinMinutesRange(r.T_time, 600) { // 时间有问题,或者差距太大 return } // 提前最新数据 //if t.T_time.Unix() > r.T_time.Unix() { // if lib.IsWithinMinutesRange( t.T_time,600){ // // 储存的 是最新数据 // return // } //} } //json序列化 str, err := json.Marshal(r) if err != nil { logs.PrintlnError("RedisDeviceData_Set", err) return } err = redis_DeviceData.Put(key, str, 1*time.Hour) if err != nil { logs.Println("set key:", key, ",value:", str, err) } return } func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) { key := T_sn + "|" + strconv.Itoa(T_id) if redis_DeviceData.IsExist(key) { v := redis_DeviceData.Get(key) json.Unmarshal(v.([]byte), &r) return r, true } return DeviceData_R{}, false } // 添加 数据 返回 真:替换 假:第一条无替换 func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) { o := orm.NewOrm() // 开始插入数据 sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " + "VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")" // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度 logs.Println(sql) _, err := o.Raw(sql).Exec() if err != nil { //Error 1146: Table 'cold.z_device_data_202335objgv2pzk' doesn't exist //Error 1062: Duplicate entry '1111-11-11 00:00:00-1' for key 'z_device_data_202335objgv2pzk.PRIMARY' if strings.Contains(err.Error(), "Error 1062") { var DeviceData_r DeviceData_old err = o.Raw("SELECT * FROM z_device_data_" + T_sn + " WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))").QueryRow(&DeviceData_r) if err != nil { logs.PrintlnError("SELECT_DeviceData:" + err.Error()) return false, DeviceData_old{} } // 重复添加 sql = "UPDATE z_device_data_" + T_sn + " SET `t_t` = " + lib.To_string(v.T_t) + ", `t_rh` = " + lib.To_string(v.T_rh) + ", `t_site` = '" + v.T_Site + "' WHERE `t_id` = " + strconv.Itoa(T_id) + " AND `t_time` = Cast('" + v.T_time.Format("2006-01-02 15:04:05") + "' AS Binary(19))" // 这里有时间优化 用于一次 prepare 多次 exec,以提高批量执行的速度 logs.Println(sql) _, err = o.Raw(sql).Exec() if err != nil { logs.PrintlnError("UPDATE_DeviceData:" + sql + err.Error()) } logs.PrintlnData(sql) return true, DeviceData_r } else { logs.PrintlnError("Add_DeviceData:" + sql + err.Error()) return false, DeviceData_old{} } } else { logs.PrintlnData(sql) } //res.RowsAffected() // 添加缓存 sn id 最新数据 RedisDeviceData_Set(T_sn, T_id, v) RedisDeviceData_Set(T_sn+"_Node", T_id, v) return false, DeviceData_old{} } func Ttt() { time.Sleep(time.Second) t1 := "2019-01-08 13:50:30" //外部传入的时间字符串 timeTemplate1 := "2006-01-02 15:04:05" //常规类型 stamp, _ := time.ParseInLocation(timeTemplate1, t1, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间 //// 更新记录 - 缓存 DeviceData_t := DeviceData_R{ T_t: 88, T_rh: 88, T_Site: "88", T_time: stamp, } if r_, DeviceData_old_r := Add_DeviceData("202335objgv2pzk1", 1, DeviceData_t); r_ { // 被替换 Add_DeviceDataOld(DeviceDataOld{ T_sn: "ffffff", T_id: 111, T_t: DeviceData_old_r.T_t, T_rh: DeviceData_old_r.T_rh, T_Site: DeviceData_old_r.T_site, T_time: DeviceData_t.T_time, T_operation: 2, T_uuid: "", }) } }