Forráskód Böngészése

排查数据缺失问题及修复数据(问题 处在于之前打的补丁 记录数据与实时数据缓存)

siked 10 hónapja
szülő
commit
42aad232c2

+ 20 - 18
MqttServer/V3MqttHandle.go

@@ -131,26 +131,27 @@ func AsyncFuncV3(r_Device *Device.Device, Ms_project Ms2m_Project, msg []byte) {
 			}
 
 			// 验证 数据是否安装规定上传间隔
-			DeviceData_Previous_r, is := Device.RedisDeviceData_Get(r_Device.T_sn, DeviceSensor_r.T_id)
+			DeviceData_Previous_r, is := Device.RedisDeviceData_Get(r_Device.T_sn+"_Node", DeviceSensor_r.T_id)
 			if is {
-				DeviceParameter_r, is := Device.Read_DeviceParameter(r_Device.T_sn)
-				if is {
-					// 计算时间差
-					duration := DeviceData_Previous_r.T_time.Sub(DeviceData_t.T_time)
-					// 将时间差表示为秒
-					seconds := int(math.Abs(duration.Seconds()))
-					// 记录间隔 超过 保存间隔(+-10s)范围
-					difference := int(math.Abs(float64(seconds - DeviceParameter_r.T_overA)))
-					if difference > 10 {
-						logs.Println(r_Device.T_sn+" 没有达到数据保存间隔,间隔:", seconds, DeviceParameter_r.T_overA, " 时间差 > 10", difference)
-
-						if F_T_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) && F_H_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) {
-							logs.Println(r_Device.T_sn + " 跳过记录数据")
-							continue
+				//最新的时间 要大
+				if DeviceData_t.T_time.Unix() > DeviceData_Previous_r.T_time.Unix() {
+					DeviceParameter_r, is := Device.Read_DeviceParameter(r_Device.T_sn)
+					if is {
+						// 计算时间差
+						duration := DeviceData_Previous_r.T_time.Sub(DeviceData_t.T_time)
+						// 将时间差表示为秒
+						seconds := int(math.Abs(duration.Seconds()))
+						// 记录间隔 超过 保存间隔(+-10s)范围
+						//difference := int(math.Abs(float64(seconds - DeviceParameter_r.T_overA)))
+						if seconds < (DeviceParameter_r.T_saveT - 10) {
+							logs.Println(r_Device.T_sn, " 没有达到数据保存间隔,间隔:", seconds, " < ", DeviceParameter_r.T_saveT, "- 10", " 最新记录数据时间:", DeviceData_Previous_r.T_time)
+							if F_T_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) && F_H_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) {
+								logs.Println("!!! 跳过记录数据", "["+r_Device.T_sn+"]", string(msg))
+								continue
+							}
 						}
 					}
 				}
-
 			}
 
 			if r_, DeviceData_old_r := Device.Add_DeviceData(r_Device.T_sn, DeviceSensor_r.T_id, DeviceData_t); r_ {
@@ -275,8 +276,9 @@ func AsyncFuncV3(r_Device *Device.Device, Ms_project Ms2m_Project, msg []byte) {
 
 					// 将时间差表示为秒
 					seconds := int(duration.Seconds())
-					if seconds < (DeviceParameter_r.T_outA - 3) {
-						logs.Println(r_Device.T_sn+" 没有达到设置报警间隔,跳过,间隔:", DeviceParameter_r.T_outA, " 时间差", seconds)
+					if seconds < (DeviceParameter_r.T_overA - 3) {
+						logs.Println(r_Device.T_sn+" 没有达到设置报警间隔,跳过,间隔:", DeviceParameter_r.T_overA, " 时间差", seconds, "  New Warningr.Id ", Warningr.Id)
+						logs.Println("!!! 跳过没有达到设置报警间隔", "["+r_Device.T_sn+"]", string(msg))
 						continue
 					}
 				}

+ 2 - 2
models/Device/Device.go

@@ -189,8 +189,8 @@ func Read_获取BX100W离线设备() []Device {
 
 	qs := o.QueryTable(new(Device))
 	var r []Device
-	qs.Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
-	//qs.Filter("T_mqttid", conf.MqttServer_id).Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
+	//qs.Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
+	qs.Filter("T_mqttid", conf.MqttServer_id).Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
 	//qs.Filter("T_pid", 242).Filter("T_State", 1).Exclude("T_online", 1).All(&r)
 
 	return r

+ 2 - 0
models/Device/DeviceData.go

@@ -92,6 +92,7 @@ func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
 // 添加 数据   返回   真:替换   假:第一条无替换
 func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old) {
 	o := orm.NewOrm()
+
 	// 开始插入数据
 	sql := "INSERT INTO z_device_data_" + T_sn + " (`t_id`, `t_t`, `t_rh`, `t_site`, `t_time`, `t_sp`) " +
 		"VALUES (" + strconv.Itoa(T_id) + " , " + lib.To_string(v.T_t) + ", " + lib.To_string(v.T_rh) + ", '" + v.T_Site + "', '" + v.T_time.Format("2006-01-02 15:04:05") + "', " + lib.To_string(v.T_sp) + ")"
@@ -134,6 +135,7 @@ func Add_DeviceData(T_sn string, T_id int, v DeviceData_R) (bool, DeviceData_old
 
 	// 添加缓存 sn id 最新数据
 	RedisDeviceData_Set(T_sn, T_id, v)
+	RedisDeviceData_Set(T_sn+"_Node", T_id, v)
 
 	return false, DeviceData_old{}
 }

+ 1 - 0
models/Device/DeviceDataOld.go

@@ -63,6 +63,7 @@ func DeviceDataOldToDeviceDataOld_R(r DeviceDataOld) (t DeviceDataOld_R) {
 // 添加
 func Add_DeviceDataOld(m DeviceDataOld) (id int64, err error) {
 	o := orm.NewOrm()
+
 	id, err = o.Insert(&m)
 	if err != nil {
 		logs.Println("Add_DeviceDataOld", err)

+ 1 - 0
models/Device/DeviceSensor.go

@@ -149,6 +149,7 @@ func Read_DeviceSensor_ByT_sn(T_sn string, T_id int) (r DeviceSensor, is bool) {
 		return r, true
 	}
 	o := orm.NewOrm()
+	o.Using("default")
 	r = DeviceSensor{T_sn: T_sn, T_id: T_id}
 	err := o.Read(&r, "T_sn", "T_id") // o.Read(&r,"Tokey") 如果不是 主键 就得指定字段名
 	if err != nil {

+ 1 - 0
models/Device/DeviceSensorParameter.go

@@ -165,6 +165,7 @@ func Read_DeviceSensorParameter(T_sn string, T_id int) (t DeviceSensorParameter,
 	}
 
 	o := orm.NewOrm()
+
 	qs := o.QueryTable(new(DeviceSensorParameter))
 
 	cond := orm.NewCondition()

+ 151 - 150
models/Warning/WarningDB.go

@@ -1,153 +1,154 @@
 package Warning
 
-import (
-	"database/sql"
-	"fmt"
-	"gorm.io/driver/mysql"
-	"gorm.io/gorm"
-	"strings"
-
-	"time"
-)
-
-var db *gorm.DB
-var sqlDB *sql.DB
-
-type WarningDB struct {
-	Id         int64     `gorm:"column:ID;type:bigint;primaryKey"`
-	T_pid      int32     `gorm:"column:t_pid;index:merge"`                        // Account.Company 绑定公司
-	T_tp       int       `gorm:"column:t_tp;index:merge"`                         // 报警类型   ->WarningList
-	T_sn       string    `gorm:"column:t_sn;index:tsn;size:256;"`                 // 设备序列号
-	T_D_name   string    `gorm:"column:t__d_name;size:256;"`                      // 设备名称
-	T_id       int32     `gorm:"column:t_id;"`                                    // 传感器 ID
-	T_DS_name  string    `gorm:"column:t__d_s_name;size:256;"`                    // 传感器名称
-	T_Remark   string    `gorm:"column:t__remark;type:longtext;"`                 // 采集内容
-	T_Ut       time.Time `gorm:"column:t__ut;type:datetime;"`                     // 采集时间
-	T_fUt      time.Time `gorm:"column:t_f_ut;type:datetime;"`                    // 首次采集时间
-	T_Text     string    `gorm:"column:t__text;type:longtext;"`                   // 处理备注
-	T_Log      string    `gorm:"column:t__log;type:longtext;"`                    // 通知日志
-	T_Msid     int64     `gorm:"column:t__msid;"`                                 // 消息ID
-	T_State    int       `gorm:"column:t__state;size:2;default:2;index:merge"`    // 0 删除   1 不处理   2 已处理   3 未处理
-	CreateTime time.Time `gorm:"column:create_time;type:datetime;autoUpdateTime"` //auto_now_add 第一次保存时才设置时间
-	UpdateTime time.Time `gorm:"column:update_time;type:datetime;autoCreateTime"` //auto_now 每次 model 保存时都会对时间自动更新
-}
-
-var MysqlServer_UrlPort = "127.0.0.1:40306"
-var MysqlServer_Database = "cold"
-var MysqlServer_Username = "cold"
-var MysqlServer_Password = "yjwyEckZS7rE5H!"
-
-func init() {
-	// 连接数据库
-	var err error
-
-	dsn := MysqlServer_Username + ":" + MysqlServer_Password + "@tcp(" + MysqlServer_UrlPort + ")/" + MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True"
-	db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
-	if err != nil {
-
-		fmt.Println(err)
-
-	}
-	sqlDB, err = db.DB()
-	// SetMaxIdleConns 设置空闲连接池中的最大连接数。
-	sqlDB.SetMaxIdleConns(1)
-
-	// SetMaxOpenConns 设置数据库连接最大打开数。
-	sqlDB.SetMaxOpenConns(10)
-
-	// SetConnMaxLifetime 设置可重用连接的最长时间
-	sqlDB.SetConnMaxLifetime(time.Hour)
-	//db.AutoMigrate(&DeviceWarning{})
-
-	//db.SetLogger(log.New(logs.LogxOrm, "\r\nWarningBack:", 0))
-	fmt.Println("db init")
-}
-
-/*
-time_a  : 开始时间  "2024-01-24 08:21:40"
-time_b  : 结束时间  "2024-03-24 08:21:40"
-WHERE_t_pid  : 公司ID   > 0
-WHERE_t_tp  : 报警类型  > 0
-WHERE_t__state  :   状态   -1 无限制  0 删除   1 不处理   2 已处理   3 未处理
-WHERE_t_sn  : 设备序列号
-PageIndex  : 页码值
-PageSize  : 每页多少
-*/
-
-func Handy(time_a, time_b string, WHERE_t_pid int, WHERE_t_tp int, WHERE_t__state int, WHERE_t_sn string, PageIndex int, PageSize int) ([]WarningDB, int) {
-	var offset int // 数据起点
-	if PageIndex <= 1 {
-		offset = 0
-	} else {
-		offset = (PageIndex - 1) * PageSize
-	}
-	var TABLE = "warning_"
-	monthsArray := generateMonthArray(time_a, time_b)
-
-	var Warning_List []WarningDB
-	var Count_List []int
-	var Count_Num int // 累积数量
-	for _, v := range monthsArray {
-		var count int64
-		var countoffset int // 当前数据起点
-
-		db.Table(TABLE+v).Where("t_tp = ?", 110).Count(&count)
-		Count_List = append(Count_List, int(count))
-		fmt.Println("TABLE:", TABLE+v, count)
-		countoffset = offset - Count_Num // 当前数据起点
-		if countoffset < 0 {
-			countoffset = 0
-		}
-		Count_Num += int(count)
-		Limit := PageSize - len(Warning_List) // 还缺多少数量
-		if offset <= Count_Num && Limit > 0 {
-			var warninglist []WarningDB
-			dbW := db.Table(TABLE + v)
-			if WHERE_t_pid > 0 {
-				dbW.Where("t_tp = ? ", WHERE_t_pid)
-			}
-			if WHERE_t_tp > 0 {
-				dbW.Where("t_pid = ? ", WHERE_t_pid)
-			}
-			if WHERE_t__state != -1 {
-				dbW.Where("t__state = ? ", WHERE_t__state)
-			}
-			if len(WHERE_t_sn) > 0 {
-				dbW.Where("t__state LIKE ? ", WHERE_t_sn)
-			}
-			dbW.Order("t__ut").Offset(countoffset).Limit(Limit).Find(&warninglist)
-			Warning_List = append(Warning_List, warninglist...)
-		}
-
-	}
-	return Warning_List, Count_Num
-}
-
-func (t *WarningDB) Create(TableName string) {
-CreateSQL:
-	result := db.Table(TableName).Create(t) // 插入数据
-	if result.Error != nil {
-		if strings.Compare(result.Error.Error(), "doesn't exist") != 0 {
-			db.Table(TableName).AutoMigrate(&WarningDB{}) // 建表
-			goto CreateSQL                                // 返回 重新插入
-		}
-	}
-}
-
-//func UPDATE(name string, ID int64, t__log string) {
-//	logs.Println("修改数据:", name, " ID:", ID)
-//	db.Exec("UPDATE "+name+" SET t__log = ?  WHERE ID = ? ;", t__log, ID)
+//
+//import (
+//	"database/sql"
+//	"fmt"
+//	"gorm.io/driver/mysql"
+//	"gorm.io/gorm"
+//	"strings"
+//
+//	"time"
+//)
+//
+//var db *gorm.DB
+//var sqlDB *sql.DB
+//
+//type WarningDB struct {
+//	Id         int64     `gorm:"column:ID;type:bigint;primaryKey"`
+//	T_pid      int32     `gorm:"column:t_pid;index:merge"`                        // Account.Company 绑定公司
+//	T_tp       int       `gorm:"column:t_tp;index:merge"`                         // 报警类型   ->WarningList
+//	T_sn       string    `gorm:"column:t_sn;index:tsn;size:256;"`                 // 设备序列号
+//	T_D_name   string    `gorm:"column:t__d_name;size:256;"`                      // 设备名称
+//	T_id       int32     `gorm:"column:t_id;"`                                    // 传感器 ID
+//	T_DS_name  string    `gorm:"column:t__d_s_name;size:256;"`                    // 传感器名称
+//	T_Remark   string    `gorm:"column:t__remark;type:longtext;"`                 // 采集内容
+//	T_Ut       time.Time `gorm:"column:t__ut;type:datetime;"`                     // 采集时间
+//	T_fUt      time.Time `gorm:"column:t_f_ut;type:datetime;"`                    // 首次采集时间
+//	T_Text     string    `gorm:"column:t__text;type:longtext;"`                   // 处理备注
+//	T_Log      string    `gorm:"column:t__log;type:longtext;"`                    // 通知日志
+//	T_Msid     int64     `gorm:"column:t__msid;"`                                 // 消息ID
+//	T_State    int       `gorm:"column:t__state;size:2;default:2;index:merge"`    // 0 删除   1 不处理   2 已处理   3 未处理
+//	CreateTime time.Time `gorm:"column:create_time;type:datetime;autoUpdateTime"` //auto_now_add 第一次保存时才设置时间
+//	UpdateTime time.Time `gorm:"column:update_time;type:datetime;autoCreateTime"` //auto_now 每次 model 保存时都会对时间自动更新
+//}
+//
+//var MysqlServer_UrlPort = "127.0.0.1:40306"
+//var MysqlServer_Database = "cold"
+//var MysqlServer_Username = "cold"
+//var MysqlServer_Password = "yjwyEckZS7rE5H!"
+//
+//func init() {
+//	// 连接数据库
+//	var err error
+//
+//	dsn := MysqlServer_Username + ":" + MysqlServer_Password + "@tcp(" + MysqlServer_UrlPort + ")/" + MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True"
+//	db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
+//	if err != nil {
+//
+//		fmt.Println(err)
+//
+//	}
+//	sqlDB, err = db.DB()
+//	// SetMaxIdleConns 设置空闲连接池中的最大连接数。
+//	sqlDB.SetMaxIdleConns(1)
+//
+//	// SetMaxOpenConns 设置数据库连接最大打开数。
+//	sqlDB.SetMaxOpenConns(10)
+//
+//	// SetConnMaxLifetime 设置可重用连接的最长时间
+//	sqlDB.SetConnMaxLifetime(time.Hour)
+//	//db.AutoMigrate(&DeviceWarning{})
+//
+//	//db.SetLogger(log.New(logs.LogxOrm, "\r\nWarningBack:", 0))
+//	fmt.Println("db init")
+//}
+//
+///*
+//time_a  : 开始时间  "2024-01-24 08:21:40"
+//time_b  : 结束时间  "2024-03-24 08:21:40"
+//WHERE_t_pid  : 公司ID   > 0
+//WHERE_t_tp  : 报警类型  > 0
+//WHERE_t__state  :   状态   -1 无限制  0 删除   1 不处理   2 已处理   3 未处理
+//WHERE_t_sn  : 设备序列号
+//PageIndex  : 页码值
+//PageSize  : 每页多少
+//*/
+//
+//func Handy(time_a, time_b string, WHERE_t_pid int, WHERE_t_tp int, WHERE_t__state int, WHERE_t_sn string, PageIndex int, PageSize int) ([]WarningDB, int) {
+//	var offset int // 数据起点
+//	if PageIndex <= 1 {
+//		offset = 0
+//	} else {
+//		offset = (PageIndex - 1) * PageSize
+//	}
+//	var TABLE = "warning_"
+//	monthsArray := generateMonthArray(time_a, time_b)
+//
+//	var Warning_List []WarningDB
+//	var Count_List []int
+//	var Count_Num int // 累积数量
+//	for _, v := range monthsArray {
+//		var count int64
+//		var countoffset int // 当前数据起点
+//
+//		db.Table(TABLE+v).Where("t_tp = ?", 110).Count(&count)
+//		Count_List = append(Count_List, int(count))
+//		fmt.Println("TABLE:", TABLE+v, count)
+//		countoffset = offset - Count_Num // 当前数据起点
+//		if countoffset < 0 {
+//			countoffset = 0
+//		}
+//		Count_Num += int(count)
+//		Limit := PageSize - len(Warning_List) // 还缺多少数量
+//		if offset <= Count_Num && Limit > 0 {
+//			var warninglist []WarningDB
+//			dbW := db.Table(TABLE + v)
+//			if WHERE_t_pid > 0 {
+//				dbW.Where("t_tp = ? ", WHERE_t_pid)
+//			}
+//			if WHERE_t_tp > 0 {
+//				dbW.Where("t_pid = ? ", WHERE_t_pid)
+//			}
+//			if WHERE_t__state != -1 {
+//				dbW.Where("t__state = ? ", WHERE_t__state)
+//			}
+//			if len(WHERE_t_sn) > 0 {
+//				dbW.Where("t__state LIKE ? ", WHERE_t_sn)
+//			}
+//			dbW.Order("t__ut").Offset(countoffset).Limit(Limit).Find(&warninglist)
+//			Warning_List = append(Warning_List, warninglist...)
+//		}
+//
+//	}
+//	return Warning_List, Count_Num
+//}
+//
+//func (t *WarningDB) Create(TableName string) {
+//CreateSQL:
+//	result := db.Table(TableName).Create(t) // 插入数据
+//	if result.Error != nil {
+//		if strings.Compare(result.Error.Error(), "doesn't exist") != 0 {
+//			db.Table(TableName).AutoMigrate(&WarningDB{}) // 建表
+//			goto CreateSQL                                // 返回 重新插入
+//		}
+//	}
+//}
+//
+////func UPDATE(name string, ID int64, t__log string) {
+////	logs.Println("修改数据:", name, " ID:", ID)
+////	db.Exec("UPDATE "+name+" SET t__log = ?  WHERE ID = ? ;", t__log, ID)
+////}
+//
+//func generateMonthArray(startDateStr string, endDateStr string) []string {
+//	startDate, _ := time.Parse("2006-01-02 15:04:05", startDateStr)
+//	endDate, _ := time.Parse("2006-01-02 15:04:05", endDateStr)
+//
+//	monthsArray := []string{}
+//	for startDate.Before(endDate) || startDate.Equal(endDate) {
+//		monthsArray = append(monthsArray, fmt.Sprintf("%d_%02d", startDate.Year(), startDate.Month()))
+//		startDate = startDate.AddDate(0, 1, 0) // Add one month
+//	}
+//
+//	return monthsArray
 //}
-
-func generateMonthArray(startDateStr string, endDateStr string) []string {
-	startDate, _ := time.Parse("2006-01-02 15:04:05", startDateStr)
-	endDate, _ := time.Parse("2006-01-02 15:04:05", endDateStr)
-
-	monthsArray := []string{}
-	for startDate.Before(endDate) || startDate.Equal(endDate) {
-		monthsArray = append(monthsArray, fmt.Sprintf("%d_%02d", startDate.Year(), startDate.Month()))
-		startDate = startDate.AddDate(0, 1, 0) // Add one month
-	}
-
-	return monthsArray
-}

+ 39 - 0
tests/conf/app.conf

@@ -0,0 +1,39 @@
+appname = Cold_mqtt
+httpport = 6202
+runmode = dev
+
+
+# Nats
+NatsServer_Url = "127.0.0.1:43422"
+
+
+# Mysql
+
+MysqlServer_UrlPort = "127.0.0.1:40306"
+MysqlServer_Database = "cold"
+MysqlServer_Username = "cold"
+MysqlServer_Password = "yjwyEckZS7rE5H!"
+MysqlServer_MaxIdleConnections = 100
+MysqlServer_MaxOpenConnections = 200
+
+
+
+# Redis
+Redis_address = "127.0.0.1:43379"
+Redis_password = ""
+Redis_dbNum = "1"
+
+# Mqtt
+# 192.168.0.7
+# Mqtt
+# 192.168.0.7
+ # mqttjxit 140.246.233.197   mqttlodr 203.195.71.200  mqttyuht 203.57.71.139
+MqttServer_id = "text_cold_mqtt"
+MqttServer_Url = "203.57.71.139:1883"
+MqttServer_ClientID = "text_cold_mqtt"
+MqttServer_Username = "admin"
+MqttServer_Password = "8f9qRNixEMhCVrF"
+
+
+
+

+ 216 - 0
tests/logrizhi_test.go

@@ -0,0 +1,216 @@
+package test
+
+import (
+	"Cold_mqtt/conf"
+	"Cold_mqtt/lib"
+	"Cold_mqtt/logs"
+	"Cold_mqtt/models/Device"
+	_ "Cold_mqtt/routers"
+	"bufio"
+	"encoding/json"
+	"fmt"
+	"github.com/beego/beego/v2/adapter/orm"
+	_ "github.com/go-sql-driver/mysql"
+	"github.com/yosssi/gmq/mqtt"
+	"github.com/yosssi/gmq/mqtt/client"
+	"math"
+	"os"
+	"regexp"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+)
+
+var (
+	cli *client.Client
+)
+
+func init() {
+
+	orm.RegisterDriver("mysql", orm.DRMySQL)
+	orm.RegisterDataBase("default", "mysql",
+		conf.MysqlServer_Username+":"+conf.MysqlServer_Password+"@tcp("+conf.MysqlServer_UrlPort+")/"+conf.MysqlServer_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
+		conf.MysqlServer_MaxIdleConnections, conf.MysqlServer_MaxOpenConnections)
+	orm.RunSyncdb("default", false, true) // 创建数据库
+
+	//logs.Println("MysqlServer:" + conf.MysqlServer_Username + ":" + conf.MysqlServer_Password + "@tcp(" + conf.MysqlServer_UrlPort + ")/" + conf.MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
+
+}
+
+// 通过 MQTT 日志恢复 所有记录数据,直接插入数据库 过滤方式, 运行目录下 mqtt.log
+func Test_MQTTlog_iDB(t *testing.T) {
+	// 打开文件
+	file, err := os.Open("logx.log")
+	if err != nil {
+		fmt.Println("无法打开文件:", err)
+		return
+	}
+	defer file.Close()
+
+	// 创建一个 Scanner 来读取文件内容
+	scanner := bufio.NewScanner(file)
+
+	// 逐行读取文件内容
+	for scanner.Scan() {
+		line := scanner.Text()
+		if strings.Contains(line, "<-/sub/") && strings.Contains(line, "\"type\":1,") {
+			//fmt.Println("包含字符串", "的行:", line)
+			logLine := line
+
+			// 定义正则表达式
+			re := regexp.MustCompile(`(?P<json>{.*})`)
+
+			// 在日志行中查找匹配的内容
+			T_json_match := re.FindStringSubmatch(logLine)
+
+			// 定义正则表达式
+			re1 := regexp.MustCompile(`\/sub\/([A-Za-z0-9]+)`)
+
+			// 在日志行中查找匹配的内容
+			T_sn_match := re1.FindStringSubmatch(logLine)
+			T_sn := ""
+			// 输出匹配结果
+			if len(T_sn_match) > 1 {
+				T_sn = T_sn_match[1]
+				//fmt.Println("T_sn:", T_sn)
+
+			} else {
+				fmt.Println("未找到匹配的字符串", logLine)
+				continue
+			}
+
+			// 输出匹配结果
+			if len(T_json_match) > 0 {
+				result := T_json_match[1] // 使用索引1获取命名捕获组"json"的内容
+				//fmt.Println("提取的字符串:", result)
+				// 实体类-数值
+				type Ms2_Project_list struct {
+					Type int                      `json:"type"`
+					Msid int64                    `json:"mid"`
+					Data []map[string]interface{} `json:"data"`
+				}
+
+				var ms2_Project_list Ms2_Project_list
+
+				err := json.Unmarshal([]byte(result), &ms2_Project_list)
+				if err != nil {
+					logs.Println("JSON反序列化失败[Ms_project_1],err=", err)
+					break
+				}
+
+				for _, v := range ms2_Project_list.Data {
+
+					int64_, _ := strconv.ParseInt(lib.To_string(v["ut"]), 10, 64)
+					UT := time.Unix(int64_, 0)
+
+					// 是否存在传感器  不存在 跳过
+					DeviceSensor_r, is := Device.Read_DeviceSensor_ByT_sn(T_sn, lib.To_int(v["id"]))
+					if !is {
+						logs.Println("MqttServer", "记录数据 传感器不存在 跳过处理", T_sn+lib.To_string(v["id"]))
+						continue
+					}
+					// 获取传感器参数
+					DeviceSensorParameter_r, is := Device.Read_DeviceSensorParameter(T_sn, DeviceSensor_r.T_id)
+					if !is {
+						logs.Println("MqttServer", "记录数据 传感器参数不存在 跳过处理", T_sn+lib.To_string(v["id"]))
+						continue
+					}
+
+					DeviceData_t := Device.DeviceData_R{
+						T_t:    float32(math.Ceil(float64(lib.To_float32(v["t"])*10)) / 10),
+						T_rh:   float32(math.Ceil(float64(lib.To_float32(v["h"])*10)) / 10),
+						T_Site: lib.To_string(v["s"]),
+						T_time: UT,
+						T_sp:   DeviceSensorParameter_r.Id,
+					}
+
+					if r_, _ := Device.Add_DeviceData(T_sn, DeviceSensor_r.T_id, DeviceData_t); r_ {
+						fmt.Println("--------- EE >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
+						// 被替换
+						//Device.Add_DeviceDataOld(Device.DeviceDataOld{
+						//	T_sn:        T_sn,
+						//	T_id:        DeviceSensor_r.T_id,
+						//	T_t:         DeviceData_old_r.T_t,
+						//	T_rh:        DeviceData_old_r.T_rh,
+						//	T_Site:      DeviceData_old_r.T_site,
+						//	T_time:      UT,
+						//	T_operation: 2,
+						//	T_uuid:      "test",
+						//})
+					} else {
+						fmt.Println("--------- OK >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
+					}
+
+				}
+			} else {
+				fmt.Println("未找到匹配的字符串")
+			}
+		}
+
+	}
+
+	// 检查是否有错误发生
+	if err := scanner.Err(); err != nil {
+		fmt.Println("读取文件时发生错误:", err)
+	}
+}
+
+func TestLoge(t *testing.T) {
+	time.Sleep(3 * time.Second)
+	logs.Println("============Run_MqttServer=============", "")
+
+	// Create an MQTT Client.
+	cli = client.New(&client.Options{
+		// Define the processing of the error handler.
+		ErrorHandler: func(err error) {
+			logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
+			return
+		},
+	})
+
+	// Terminate the Client.
+	defer cli.Terminate()
+
+	c := client.ConnectOptions{
+		Network:  "tcp",
+		Address:  conf.MqttServer_Url,
+		ClientID: []byte(conf.MqttServer_ClientID + "TestLoge"),
+		UserName: []byte(conf.MqttServer_Username),
+		Password: []byte(conf.MqttServer_Password),
+	}
+	logs.Println("Address:", c.Address)
+	logs.Println("ClientID:", string(c.ClientID))
+
+	// Connect to the MQTT Server.
+	err := cli.Connect(&c)
+
+	if err != nil {
+		logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
+		logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
+		fmt.Println("err!!!!!! 连接MQTT失败:", err)
+		cli.Terminate()
+		time.Sleep(3 * time.Second)
+		return
+	}
+
+	time.Sleep(30 * time.Second)
+
+}
+
+// 发送数据
+func Mqtt_publish(topic string, text string) {
+
+	// Publish a message.
+	err := cli.Publish(&client.PublishOptions{
+		QoS:       mqtt.QoS0,
+		TopicName: []byte("/pub/" + topic),
+		Message:   []byte(text),
+	})
+
+	logs.PrintlnMqtt("-> /pub/" + topic + " " + text)
+	if err != nil {
+		logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text)
+	}
+
+}