123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package test
- import (
- "Cold_mqtt/conf"
- "Cold_mqtt/lib"
- "Cold_mqtt/logs"
- "Cold_mqtt/models/Device"
- _ "Cold_mqtt/routers"
- "bufio"
- "encoding/json"
- "fmt"
- "github.com/beego/beego/v2/adapter/orm"
- _ "github.com/go-sql-driver/mysql"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "math"
- "os"
- "regexp"
- "strconv"
- "strings"
- "testing"
- "time"
- )
- var (
- cli *client.Client
- )
- func init() {
- orm.RegisterDriver("mysql", orm.DRMySQL)
- orm.RegisterDataBase("default", "mysql",
- conf.MysqlServer_Username+":"+conf.MysqlServer_Password+"@tcp("+conf.MysqlServer_UrlPort+")/"+conf.MysqlServer_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
- conf.MysqlServer_MaxIdleConnections, conf.MysqlServer_MaxOpenConnections)
- orm.RunSyncdb("default", false, true) // 创建数据库
- //logs.Println("MysqlServer:" + conf.MysqlServer_Username + ":" + conf.MysqlServer_Password + "@tcp(" + conf.MysqlServer_UrlPort + ")/" + conf.MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
- }
- // 通过 MQTT 日志恢复 所有记录数据,直接插入数据库 过滤方式, 运行目录下 mqtt.log
- func Test_MQTTlog_iDB(t *testing.T) {
- // 打开文件
- file, err := os.Open("logx.log")
- if err != nil {
- fmt.Println("无法打开文件:", err)
- return
- }
- defer file.Close()
- // 创建一个 Scanner 来读取文件内容
- scanner := bufio.NewScanner(file)
- // 逐行读取文件内容
- for scanner.Scan() {
- line := scanner.Text()
- if strings.Contains(line, "<-/sub/") && strings.Contains(line, "\"type\":1,") {
- //fmt.Println("包含字符串", "的行:", line)
- logLine := line
- // 定义正则表达式
- re := regexp.MustCompile(`(?P<json>{.*})`)
- // 在日志行中查找匹配的内容
- T_json_match := re.FindStringSubmatch(logLine)
- // 定义正则表达式
- re1 := regexp.MustCompile(`\/sub\/([A-Za-z0-9]+)`)
- // 在日志行中查找匹配的内容
- T_sn_match := re1.FindStringSubmatch(logLine)
- T_sn := ""
- // 输出匹配结果
- if len(T_sn_match) > 1 {
- T_sn = T_sn_match[1]
- //fmt.Println("T_sn:", T_sn)
- } else {
- fmt.Println("未找到匹配的字符串", logLine)
- continue
- }
- // 输出匹配结果
- if len(T_json_match) > 0 {
- result := T_json_match[1] // 使用索引1获取命名捕获组"json"的内容
- //fmt.Println("提取的字符串:", result)
- // 实体类-数值
- type Ms2_Project_list struct {
- Type int `json:"type"`
- Msid int64 `json:"mid"`
- Data []map[string]interface{} `json:"data"`
- }
- var ms2_Project_list Ms2_Project_list
- err := json.Unmarshal([]byte(result), &ms2_Project_list)
- if err != nil {
- logs.Println("JSON反序列化失败[Ms_project_1],err=", err)
- break
- }
- for _, v := range ms2_Project_list.Data {
- int64_, _ := strconv.ParseInt(lib.To_string(v["ut"]), 10, 64)
- UT := time.Unix(int64_, 0)
- // 是否存在传感器 不存在 跳过
- DeviceSensor_r, is := Device.Read_DeviceSensor_ByT_sn(T_sn, lib.To_int(v["id"]))
- if !is {
- logs.Println("MqttServer", "记录数据 传感器不存在 跳过处理", T_sn+lib.To_string(v["id"]))
- continue
- }
- // 获取传感器参数
- DeviceSensorParameter_r, is := Device.Read_DeviceSensorParameter(T_sn, DeviceSensor_r.T_id)
- if !is {
- logs.Println("MqttServer", "记录数据 传感器参数不存在 跳过处理", T_sn+lib.To_string(v["id"]))
- continue
- }
- DeviceData_t := Device.DeviceData_R{
- T_t: float32(math.Ceil(float64(lib.To_float32(v["t"])*10)) / 10),
- T_rh: float32(math.Ceil(float64(lib.To_float32(v["h"])*10)) / 10),
- T_Site: lib.To_string(v["s"]),
- T_time: UT,
- T_sp: DeviceSensorParameter_r.Id,
- }
- if r_, _ := Device.Add_DeviceData(T_sn, DeviceSensor_r.T_id, DeviceData_t); r_ {
- fmt.Println("--------- EE >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
- // 被替换
- //Device.Add_DeviceDataOld(Device.DeviceDataOld{
- // T_sn: T_sn,
- // T_id: DeviceSensor_r.T_id,
- // T_t: DeviceData_old_r.T_t,
- // T_rh: DeviceData_old_r.T_rh,
- // T_Site: DeviceData_old_r.T_site,
- // T_time: UT,
- // T_operation: 2,
- // T_uuid: "test",
- //})
- } else {
- fmt.Println("--------- OK >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
- }
- }
- } else {
- fmt.Println("未找到匹配的字符串")
- }
- }
- }
- // 检查是否有错误发生
- if err := scanner.Err(); err != nil {
- fmt.Println("读取文件时发生错误:", err)
- }
- }
- func TestLoge(t *testing.T) {
- time.Sleep(3 * time.Second)
- logs.Println("============Run_MqttServer=============", "")
- // Create an MQTT Client.
- cli = client.New(&client.Options{
- // Define the processing of the error handler.
- ErrorHandler: func(err error) {
- logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
- return
- },
- })
- // Terminate the Client.
- defer cli.Terminate()
- c := client.ConnectOptions{
- Network: "tcp",
- Address: conf.MqttServer_Url,
- ClientID: []byte(conf.MqttServer_ClientID + "TestLoge"),
- UserName: []byte(conf.MqttServer_Username),
- Password: []byte(conf.MqttServer_Password),
- }
- logs.Println("Address:", c.Address)
- logs.Println("ClientID:", string(c.ClientID))
- // Connect to the MQTT Server.
- err := cli.Connect(&c)
- if err != nil {
- logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
- logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
- fmt.Println("err!!!!!! 连接MQTT失败:", err)
- cli.Terminate()
- time.Sleep(3 * time.Second)
- return
- }
- time.Sleep(30 * time.Second)
- }
- // 发送数据
- func Mqtt_publish(topic string, text string) {
- // Publish a message.
- err := cli.Publish(&client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte("/pub/" + topic),
- Message: []byte(text),
- })
- logs.PrintlnMqtt("-> /pub/" + topic + " " + text)
- if err != nil {
- logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text)
- }
- }
|