package MqttServer // //import ( // "ColdP_server/models/Device" // "encoding/json" // "errors" // "fmt" // "github.com/astaxie/beego/logs" // beego "github.com/beego/beego/v2/server/web" // "github.com/yosssi/gmq/mqtt" // "github.com/yosssi/gmq/mqtt/client" // "log" // "os" // "sync" // "sync/atomic" // "time" //) // //var ( // mqttSuffix string // mqttPort string // mqttUsername string // mqttPassword string // mqttUrl string // subscribedTopics map[string]bool // topicMutex sync.Mutex // publishQueue chan *PublishTask //) // //type PublishTask struct { // Cli *client.Client // Topic string // Msg []byte //} // //func init() { // var err error // mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix") // if err != nil { // log.Fatalf("Failed to load Mqtt_suffix: %v", err) // } // // mqttPort, err = beego.AppConfig.String("Mqtt_port") // if err != nil { // log.Fatalf("Failed to load Mqtt_port: %v", err) // } // // mqttUsername, err = beego.AppConfig.String("Mqtt_username") // if err != nil { // log.Fatalf("Failed to load Mqtt_username: %v", err) // } // // mqttPassword, err = beego.AppConfig.String("Mqtt_password") // if err != nil { // log.Fatalf("Failed to load Mqtt_password: %v", err) // } // // subscribedTopics = make(map[string]bool) // publishQueue = make(chan *PublishTask, 100) // 根据预估消息量设置队列大小 // go func() { // for task := range publishQueue { // err := PubMqttMessage(task.Cli, task.Topic, task.Msg) // if err != nil { // log.Printf("发布消息失败,主题: %s,错误: %v", task.Topic, err) // } // } // }() //} // //// GetMqttClient 获取MQTT客户端连接 //func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) { // //r := rand.New(rand.NewSource(time.Now().Unix())) // var clientIdCounter int32 // pid := os.Getpid() // // 使用时间戳和进程ID以及一个自增的计数器(简单示例,实际可更完善) // uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1)) // options := client.ConnectOptions{ // Network: "tcp", // Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort), // UserName: []byte(mqttUsername), // Password: []byte(mqttPassword), // ClientID: []byte(uniqueId), // CleanSession: false, // 保持会话,有助于自动重连 // } // cli := client.New(&client.Options{ // ErrorHandler: func(e error) { // logs.Error("MQTT Error: %v", e) // }, // }) // options.PINGRESPTimeout = time.Second * 5 // // // 尝试连接并处理重试逻辑 // var err error // for i := 0; i <= 3; i++ { // if err = cli.Connect(&options); err == nil { // fmt.Println("MQTT 连接成功") // break // } // logs.Error("MQTT 连接失败: %v,正在重试... (尝试次数: %d/%d)", err, i+1, 3) // // // 如果不是最后一次尝试,则等待一段时间再重试 // if i < 3 { // time.Sleep(time.Second * time.Duration(i+1)) // } // } // // if err != nil { // logs.Error("MQTT 连接失败,已达到最大重试次数") // return nil, fmt.Errorf("MQTT 连接失败: %w", err) // } // // return cli, nil //} // //// PubMqttMessage 发送信息 //func PubMqttMessage(cli *client.Client, topic string, msg []byte) error { // opts := &client.PublishOptions{ // QoS: mqtt.QoS0, // 根据需求调整QoS级别 // TopicName: []byte(topic), // Message: msg, // } // for i := 0; i <= 3; i++ { // if err := cli.Publish(opts); err == nil { // log.Printf("MQTT 发送成功, 主题: %s", topic) // return nil // } // log.Printf("MQTT 发布失败, 主题: %s, 错误: %v,正在重试... (尝试次数: %d)", topic, i+1, 3) // // 如果不是最后一次尝试,则等待一段时间再重试 // if i < 3 { // time.Sleep(time.Second * time.Duration(i+1)) // } // } // // 如果所有重试都失败了,返回错误 // return fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic) //} //func PubMqttMessageAndSub(cli *client.Client, topic, topicSub string, msg []byte) (devi Device.Deviation, e error) { // opts := &client.PublishOptions{ // QoS: mqtt.QoS1, // 根据需求调整QoS级别 // TopicName: []byte(topic), // Message: msg, // } // var publishErr error // err := cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // log.Println("接收到数据", string(message)) // err := json.Unmarshal(message, &devi) // if err != nil { // log.Printf("JSON解析消息失败: %v", err) // } // }, // }, // }, // }) // if err != nil { // log.Printf("MQTT 订阅失败: %v", err) // return devi, err // } // var done chan struct{} // go func() { // for i := 0; i <= 3; i++ { // if err := cli.Publish(opts); err == nil { // log.Printf("MQTT 发送成功, 主题: %s", topic) // close(done) // return // } // if i < 3 { // time.Sleep(time.Second * time.Duration(i+1)) // } // } // publishErr = fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic) // close(done) // }() // // // 等待发布操作完成或者超时处理(这里简单等待一定时间,可根据实际情况优化超时逻辑) // select { // case <-done: // case <-time.After(time.Second * 10): // publishErr = fmt.Errorf("MQTT 发布超时, 主题: %s", topic) // } // // 如果所有重试都失败了,返回错误 // return Device.Deviation{}, publishErr //} // //func Subscript(cli *client.Client, topic string, msg chan string, done chan struct{}) error { // fmt.Printf("订阅主题:%s\n", topic) // subOpts := &client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // { // TopicFilter: []byte(topic), // QoS: mqtt.QoS0, // Handler: func(topicName, message []byte) { // fmt.Println("MQTT接收到信息:", string(message)) // select { // case msg <- topic + "||" + string(message): // fmt.Println("接收到数据", topic, string(message)) // case <-done: // fmt.Println("通道已关闭,无法发送消息") // return // case <-time.After(time.Millisecond * 100): // 防止阻塞太久 // fmt.Println("通道已满,无法发送消息") // } // }, // }, // }, // } // if err := cli.Subscribe(subOpts); err != nil { // log.Printf("MQTT 订阅失败: %v", err) // cli.Terminate() // return errors.New("订阅消息失败") // } // return nil //}