|
- package MqttServer
- //
- //import (
- // "ColdP_server/models/Device"
- // "encoding/json"
- // "errors"
- // "fmt"
- // "github.com/astaxie/beego/logs"
- // beego "github.com/beego/beego/v2/server/web"
- // "github.com/yosssi/gmq/mqtt"
- // "github.com/yosssi/gmq/mqtt/client"
- // "log"
- // "os"
- // "sync"
- // "sync/atomic"
- // "time"
- //)
- //
- //var (
- // mqttSuffix string
- // mqttPort string
- // mqttUsername string
- // mqttPassword string
- // mqttUrl string
- // subscribedTopics map[string]bool
- // topicMutex sync.Mutex
- // publishQueue chan *PublishTask
- //)
- //
- //type PublishTask struct {
- // Cli *client.Client
- // Topic string
- // Msg []byte
- //}
- //
- //func init() {
- // var err error
- // mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
- // if err != nil {
- // log.Fatalf("Failed to load Mqtt_suffix: %v", err)
- // }
- //
- // mqttPort, err = beego.AppConfig.String("Mqtt_port")
- // if err != nil {
- // log.Fatalf("Failed to load Mqtt_port: %v", err)
- // }
- //
- // mqttUsername, err = beego.AppConfig.String("Mqtt_username")
- // if err != nil {
- // log.Fatalf("Failed to load Mqtt_username: %v", err)
- // }
- //
- // mqttPassword, err = beego.AppConfig.String("Mqtt_password")
- // if err != nil {
- // log.Fatalf("Failed to load Mqtt_password: %v", err)
- // }
- //
- // subscribedTopics = make(map[string]bool)
- // publishQueue = make(chan *PublishTask, 100) // 根据预估消息量设置队列大小
- // go func() {
- // for task := range publishQueue {
- // err := PubMqttMessage(task.Cli, task.Topic, task.Msg)
- // if err != nil {
- // log.Printf("发布消息失败,主题: %s,错误: %v", task.Topic, err)
- // }
- // }
- // }()
- //}
- //
- //// GetMqttClient 获取MQTT客户端连接
- //func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) {
- // //r := rand.New(rand.NewSource(time.Now().Unix()))
- // var clientIdCounter int32
- // pid := os.Getpid()
- // // 使用时间戳和进程ID以及一个自增的计数器(简单示例,实际可更完善)
- // uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1))
- // options := client.ConnectOptions{
- // Network: "tcp",
- // Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort),
- // UserName: []byte(mqttUsername),
- // Password: []byte(mqttPassword),
- // ClientID: []byte(uniqueId),
- // CleanSession: false, // 保持会话,有助于自动重连
- // }
- // cli := client.New(&client.Options{
- // ErrorHandler: func(e error) {
- // logs.Error("MQTT Error: %v", e)
- // },
- // })
- // options.PINGRESPTimeout = time.Second * 5
- //
- // // 尝试连接并处理重试逻辑
- // var err error
- // for i := 0; i <= 3; i++ {
- // if err = cli.Connect(&options); err == nil {
- // fmt.Println("MQTT 连接成功")
- // break
- // }
- // logs.Error("MQTT 连接失败: %v,正在重试... (尝试次数: %d/%d)", err, i+1, 3)
- //
- // // 如果不是最后一次尝试,则等待一段时间再重试
- // if i < 3 {
- // time.Sleep(time.Second * time.Duration(i+1))
- // }
- // }
- //
- // if err != nil {
- // logs.Error("MQTT 连接失败,已达到最大重试次数")
- // return nil, fmt.Errorf("MQTT 连接失败: %w", err)
- // }
- //
- // return cli, nil
- //}
- //
- //// PubMqttMessage 发送信息
- //func PubMqttMessage(cli *client.Client, topic string, msg []byte) error {
- // opts := &client.PublishOptions{
- // QoS: mqtt.QoS0, // 根据需求调整QoS级别
- // TopicName: []byte(topic),
- // Message: msg,
- // }
- // for i := 0; i <= 3; i++ {
- // if err := cli.Publish(opts); err == nil {
- // log.Printf("MQTT 发送成功, 主题: %s", topic)
- // return nil
- // }
- // log.Printf("MQTT 发布失败, 主题: %s, 错误: %v,正在重试... (尝试次数: %d)", topic, i+1, 3)
- // // 如果不是最后一次尝试,则等待一段时间再重试
- // if i < 3 {
- // time.Sleep(time.Second * time.Duration(i+1))
- // }
- // }
- // // 如果所有重试都失败了,返回错误
- // return fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic)
- //}
- //func PubMqttMessageAndSub(cli *client.Client, topic, topicSub string, msg []byte) (devi Device.Deviation, e error) {
- // opts := &client.PublishOptions{
- // QoS: mqtt.QoS1, // 根据需求调整QoS级别
- // TopicName: []byte(topic),
- // Message: msg,
- // }
- // var publishErr error
- // err := cli.Subscribe(&client.SubscribeOptions{
- // SubReqs: []*client.SubReq{
- // &client.SubReq{
- // TopicFilter: []byte(topicSub),
- // QoS: mqtt.QoS1,
- // Handler: func(topicName, message []byte) {
- // log.Println("接收到数据", string(message))
- // err := json.Unmarshal(message, &devi)
- // if err != nil {
- // log.Printf("JSON解析消息失败: %v", err)
- // }
- // },
- // },
- // },
- // })
- // if err != nil {
- // log.Printf("MQTT 订阅失败: %v", err)
- // return devi, err
- // }
- // var done chan struct{}
- // go func() {
- // for i := 0; i <= 3; i++ {
- // if err := cli.Publish(opts); err == nil {
- // log.Printf("MQTT 发送成功, 主题: %s", topic)
- // close(done)
- // return
- // }
- // if i < 3 {
- // time.Sleep(time.Second * time.Duration(i+1))
- // }
- // }
- // publishErr = fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic)
- // close(done)
- // }()
- //
- // // 等待发布操作完成或者超时处理(这里简单等待一定时间,可根据实际情况优化超时逻辑)
- // select {
- // case <-done:
- // case <-time.After(time.Second * 10):
- // publishErr = fmt.Errorf("MQTT 发布超时, 主题: %s", topic)
- // }
- // // 如果所有重试都失败了,返回错误
- // return Device.Deviation{}, publishErr
- //}
- //
- //func Subscript(cli *client.Client, topic string, msg chan string, done chan struct{}) error {
- // fmt.Printf("订阅主题:%s\n", topic)
- // subOpts := &client.SubscribeOptions{
- // SubReqs: []*client.SubReq{
- // {
- // TopicFilter: []byte(topic),
- // QoS: mqtt.QoS0,
- // Handler: func(topicName, message []byte) {
- // fmt.Println("MQTT接收到信息:", string(message))
- // select {
- // case msg <- topic + "||" + string(message):
- // fmt.Println("接收到数据", topic, string(message))
- // case <-done:
- // fmt.Println("通道已关闭,无法发送消息")
- // return
- // case <-time.After(time.Millisecond * 100): // 防止阻塞太久
- // fmt.Println("通道已满,无法发送消息")
- // }
- // },
- // },
- // },
- // }
- // if err := cli.Subscribe(subOpts); err != nil {
- // log.Printf("MQTT 订阅失败: %v", err)
- // cli.Terminate()
- // return errors.New("订阅消息失败")
- // }
- // return nil
- //}
|