logrizhi_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package test
  2. import (
  3. "Cold_mqtt/conf"
  4. "Cold_mqtt/lib"
  5. "Cold_mqtt/logs"
  6. "Cold_mqtt/models/Device"
  7. _ "Cold_mqtt/routers"
  8. "bufio"
  9. "encoding/json"
  10. "fmt"
  11. "github.com/beego/beego/v2/adapter/orm"
  12. _ "github.com/go-sql-driver/mysql"
  13. "github.com/yosssi/gmq/mqtt"
  14. "github.com/yosssi/gmq/mqtt/client"
  15. "math"
  16. "os"
  17. "regexp"
  18. "strconv"
  19. "strings"
  20. "testing"
  21. "time"
  22. )
  23. var (
  24. cli *client.Client
  25. )
  26. func init() {
  27. orm.RegisterDriver("mysql", orm.DRMySQL)
  28. orm.RegisterDataBase("default", "mysql",
  29. conf.MysqlServer_Username+":"+conf.MysqlServer_Password+"@tcp("+conf.MysqlServer_UrlPort+")/"+conf.MysqlServer_Database+"?charset=utf8mb4&loc=Local&parseTime=True",
  30. conf.MysqlServer_MaxIdleConnections, conf.MysqlServer_MaxOpenConnections)
  31. orm.RunSyncdb("default", false, true) // 创建数据库
  32. //logs.Println("MysqlServer:" + conf.MysqlServer_Username + ":" + conf.MysqlServer_Password + "@tcp(" + conf.MysqlServer_UrlPort + ")/" + conf.MysqlServer_Database + "?charset=utf8mb4&loc=Local&parseTime=True")
  33. }
  34. // 通过 MQTT 日志恢复 所有记录数据,直接插入数据库 过滤方式, 运行目录下 mqtt.log
  35. func Test_MQTTlog_iDB(t *testing.T) {
  36. // 打开文件
  37. file, err := os.Open("logx.log")
  38. if err != nil {
  39. fmt.Println("无法打开文件:", err)
  40. return
  41. }
  42. defer file.Close()
  43. // 创建一个 Scanner 来读取文件内容
  44. scanner := bufio.NewScanner(file)
  45. // 逐行读取文件内容
  46. for scanner.Scan() {
  47. line := scanner.Text()
  48. if strings.Contains(line, "<-/sub/") && strings.Contains(line, "\"type\":1,") {
  49. //fmt.Println("包含字符串", "的行:", line)
  50. logLine := line
  51. // 定义正则表达式
  52. re := regexp.MustCompile(`(?P<json>{.*})`)
  53. // 在日志行中查找匹配的内容
  54. T_json_match := re.FindStringSubmatch(logLine)
  55. // 定义正则表达式
  56. re1 := regexp.MustCompile(`\/sub\/([A-Za-z0-9]+)`)
  57. // 在日志行中查找匹配的内容
  58. T_sn_match := re1.FindStringSubmatch(logLine)
  59. T_sn := ""
  60. // 输出匹配结果
  61. if len(T_sn_match) > 1 {
  62. T_sn = T_sn_match[1]
  63. //fmt.Println("T_sn:", T_sn)
  64. } else {
  65. fmt.Println("未找到匹配的字符串", logLine)
  66. continue
  67. }
  68. // 输出匹配结果
  69. if len(T_json_match) > 0 {
  70. result := T_json_match[1] // 使用索引1获取命名捕获组"json"的内容
  71. //fmt.Println("提取的字符串:", result)
  72. // 实体类-数值
  73. type Ms2_Project_list struct {
  74. Type int `json:"type"`
  75. Msid int64 `json:"mid"`
  76. Data []map[string]interface{} `json:"data"`
  77. }
  78. var ms2_Project_list Ms2_Project_list
  79. err := json.Unmarshal([]byte(result), &ms2_Project_list)
  80. if err != nil {
  81. logs.Println("JSON反序列化失败[Ms_project_1],err=", err)
  82. break
  83. }
  84. for _, v := range ms2_Project_list.Data {
  85. int64_, _ := strconv.ParseInt(lib.To_string(v["ut"]), 10, 64)
  86. UT := time.Unix(int64_, 0)
  87. // 是否存在传感器 不存在 跳过
  88. DeviceSensor_r, is := Device.Read_DeviceSensor_ByT_sn(T_sn, lib.To_int(v["id"]))
  89. if !is {
  90. logs.Println("MqttServer", "记录数据 传感器不存在 跳过处理", T_sn+lib.To_string(v["id"]))
  91. continue
  92. }
  93. // 获取传感器参数
  94. DeviceSensorParameter_r, is := Device.Read_DeviceSensorParameter(T_sn, DeviceSensor_r.T_id)
  95. if !is {
  96. logs.Println("MqttServer", "记录数据 传感器参数不存在 跳过处理", T_sn+lib.To_string(v["id"]))
  97. continue
  98. }
  99. DeviceData_t := Device.DeviceData_R{
  100. T_t: float32(math.Ceil(float64(lib.To_float32(v["t"])*10)) / 10),
  101. T_rh: float32(math.Ceil(float64(lib.To_float32(v["h"])*10)) / 10),
  102. T_Site: lib.To_string(v["s"]),
  103. T_time: UT,
  104. T_sp: DeviceSensorParameter_r.Id,
  105. }
  106. if r_, _ := Device.Add_DeviceData(T_sn, DeviceSensor_r.T_id, DeviceData_t); r_ {
  107. fmt.Println("--------- EE >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
  108. // 被替换
  109. //Device.Add_DeviceDataOld(Device.DeviceDataOld{
  110. // T_sn: T_sn,
  111. // T_id: DeviceSensor_r.T_id,
  112. // T_t: DeviceData_old_r.T_t,
  113. // T_rh: DeviceData_old_r.T_rh,
  114. // T_Site: DeviceData_old_r.T_site,
  115. // T_time: UT,
  116. // T_operation: 2,
  117. // T_uuid: "test",
  118. //})
  119. } else {
  120. fmt.Println("--------- OK >", T_sn, DeviceSensor_r.T_id, DeviceData_t.T_time, DeviceData_t)
  121. }
  122. }
  123. } else {
  124. fmt.Println("未找到匹配的字符串")
  125. }
  126. }
  127. }
  128. // 检查是否有错误发生
  129. if err := scanner.Err(); err != nil {
  130. fmt.Println("读取文件时发生错误:", err)
  131. }
  132. }
  133. func TestLoge(t *testing.T) {
  134. time.Sleep(3 * time.Second)
  135. logs.Println("============Run_MqttServer=============", "")
  136. // Create an MQTT Client.
  137. cli = client.New(&client.Options{
  138. // Define the processing of the error handler.
  139. ErrorHandler: func(err error) {
  140. logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
  141. return
  142. },
  143. })
  144. // Terminate the Client.
  145. defer cli.Terminate()
  146. c := client.ConnectOptions{
  147. Network: "tcp",
  148. Address: conf.MqttServer_Url,
  149. ClientID: []byte(conf.MqttServer_ClientID + "TestLoge"),
  150. UserName: []byte(conf.MqttServer_Username),
  151. Password: []byte(conf.MqttServer_Password),
  152. }
  153. logs.Println("Address:", c.Address)
  154. logs.Println("ClientID:", string(c.ClientID))
  155. // Connect to the MQTT Server.
  156. err := cli.Connect(&c)
  157. if err != nil {
  158. logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  159. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  160. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  161. cli.Terminate()
  162. time.Sleep(3 * time.Second)
  163. return
  164. }
  165. time.Sleep(30 * time.Second)
  166. }
  167. // 发送数据
  168. func Mqtt_publish(topic string, text string) {
  169. // Publish a message.
  170. err := cli.Publish(&client.PublishOptions{
  171. QoS: mqtt.QoS0,
  172. TopicName: []byte("/pub/" + topic),
  173. Message: []byte(text),
  174. })
  175. logs.PrintlnMqtt("-> /pub/" + topic + " " + text)
  176. if err != nil {
  177. logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text)
  178. }
  179. }