package test import ( "Cold_mqtt/conf" "Cold_mqtt/lib" "Cold_mqtt/logs" "Cold_mqtt/models/Device" _ "Cold_mqtt/routers" "bufio" "encoding/json" "fmt" "github.com/beego/beego/v2/adapter/orm" _ "github.com/go-sql-driver/mysql" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "math" "os" "regexp" "strconv" "strings" "testing" "time" ) var ( cli *client.Client ) func init() { orm.RegisterDriver("mysql", orm.DRMySQL) orm.RegisterDataBase("default", "mysql", conf.MysqlServer_Username+":"+conf.MysqlServer_Password+"@tcp("+conf.MysqlServer_UrlPort+")/"+conf.MysqlServer_Database+"?charset=utf8mb4&loc=Local&parseTime=True", conf.MysqlServer_MaxIdleConnections, conf.MysqlServer_MaxOpenConnections) orm.RunSyncdb("default", false, true) // 创建数据库 //logs.Println("MysqlServer:" + conf.MysqlServer_Username + ":" + conf.MysqlServer_Password + "@tcp(" + conf.MysqlServer_UrlPort + ")/" + conf.MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True") } // 通过 MQTT 日志恢复 所有记录数据,直接插入数据库 过滤方式, 运行目录下 mqtt.log func Test_MQTTlog_iDB(t *testing.T) { // 打开文件 file, err := os.Open("logx.log") if err != nil { fmt.Println("无法打开文件:", err) return } defer file.Close() // 创建一个 Scanner 来读取文件内容 scanner := bufio.NewScanner(file) // 逐行读取文件内容 for scanner.Scan() { line := scanner.Text() if strings.Contains(line, "<-/sub/") && strings.Contains(line, "\"type\":1,") { //fmt.Println("包含字符串", "的行:", line) logLine := line // 定义正则表达式 re := regexp.MustCompile(`(?P{.*})`) // 在日志行中查找匹配的内容 T_json_match := re.FindStringSubmatch(logLine) // 定义正则表达式 re1 := regexp.MustCompile(`\/sub\/([A-Za-z0-9]+)`) // 在日志行中查找匹配的内容 T_sn_match := re1.FindStringSubmatch(logLine) T_sn := "" // 输出匹配结果 if len(T_sn_match) > 1 { T_sn = T_sn_match[1] //fmt.Println("T_sn:", T_sn) } else { fmt.Println("未找到匹配的字符串", logLine) continue } // 输出匹配结果 if len(T_json_match) > 0 { result := T_json_match[1] // 使用索引1获取命名捕获组"json"的内容 //fmt.Println("提取的字符串:", result) // 实体类-数值 type Ms2_Project_list struct { Type int `json:"type"` Msid int64 `json:"mid"` Data []map[string]interface{} `json:"data"` } var ms2_Project_list Ms2_Project_list err := json.Unmarshal([]byte(result), &ms2_Project_list) if err != nil { logs.Println("JSON反序列化失败[Ms_project_1],err=", err) break } for _, v := range ms2_Project_list.Data { int64_, _ := strconv.ParseInt(lib.To_string(v["ut"]), 10, 64) UT := time.Unix(int64_, 0) // 是否存在传感器 不存在 跳过 DeviceSensor_r, is := Device.Read_DeviceSensor_ByT_sn(T_sn, lib.To_int(v["id"])) if !is { logs.Println("MqttServer", "记录数据 传感器不存在 跳过处理", T_sn+lib.To_string(v["id"])) continue } // 获取传感器参数 DeviceSensorParameter_r, is := Device.Read_DeviceSensorParameter(T_sn, DeviceSensor_r.T_id) if !is { logs.Println("MqttServer", "记录数据 传感器参数不存在 跳过处理", T_sn+lib.To_string(v["id"])) continue } DeviceData_t := Device.DeviceData_R{ T_t: float32(math.Ceil(float64(lib.To_float32(v["t"])*10)) / 10), T_rh: float32(math.Ceil(float64(lib.To_float32(v["h"])*10)) / 10), T_Site: lib.To_string(v["s"]), T_time: UT, T_sp: DeviceSensorParameter_r.Id, } if r_, _ := Device.Add_DeviceData(T_sn, DeviceSensor_r.T_id, DeviceData_t, true); r_ { fmt.Println("--------- EE >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t) // 被替换 //Device.Add_DeviceDataOld(Device.DeviceDataOld{ // T_sn: T_sn, // T_id: DeviceSensor_r.T_id, // T_t: DeviceData_old_r.T_t, // T_rh: DeviceData_old_r.T_rh, // T_Site: DeviceData_old_r.T_site, // T_time: UT, // T_operation: 2, // T_uuid: "test", //}) } else { fmt.Println("--------- OK >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t) } } } else { fmt.Println("未找到匹配的字符串") } } } // 检查是否有错误发生 if err := scanner.Err(); err != nil { fmt.Println("读取文件时发生错误:", err) } } func TestLoge(t *testing.T) { time.Sleep(3 * time.Second) logs.Println("============Run_MqttServer=============", "") // Create an MQTT Client. cli = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error()) return }, }) // Terminate the Client. defer cli.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: conf.MqttServer_Url, ClientID: []byte(conf.MqttServer_ClientID + "TestLoge"), UserName: []byte(conf.MqttServer_Username), Password: []byte(conf.MqttServer_Password), } logs.Println("Address:", c.Address) logs.Println("ClientID:", string(c.ClientID)) // Connect to the MQTT Server. err := cli.Connect(&c) if err != nil { logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) fmt.Println("err!!!!!! 连接MQTT失败:", err) cli.Terminate() time.Sleep(3 * time.Second) return } time.Sleep(30 * time.Second) } // 发送数据 func Mqtt_publish(topic string, text string) { // Publish a message. err := cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte("/pub/" + topic), Message: []byte(text), }) logs.PrintlnMqtt("-> /pub/" + topic + " " + text) if err != nil { logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text) } }