package MqttServer import ( "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "log" "math/rand" "strings" "time" ) var ( mqttSuffix string mqttPort string mqttUsername string mqttPassword string ) func init() { var err error mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix") if err != nil { log.Fatalf("Failed to load Mqtt_suffix: %v", err) } mqttPort, err = beego.AppConfig.String("Mqtt_port") if err != nil { log.Fatalf("Failed to load Mqtt_port: %v", err) } mqttUsername, err = beego.AppConfig.String("Mqtt_username") if err != nil { log.Fatalf("Failed to load Mqtt_username: %v", err) } mqttPassword, err = beego.AppConfig.String("Mqtt_password") if err != nil { log.Fatalf("Failed to load Mqtt_password: %v", err) } } // GetMqttClient 获取MQTT客户端连接 func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) { r := rand.New(rand.NewSource(time.Now().Unix())) options := client.ConnectOptions{ Network: "tcp", Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort), UserName: []byte(mqttUsername), Password: []byte(mqttPassword), ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))), } cli := client.New(&client.Options{ ErrorHandler: func(e error) { log.Printf("MQTT Error: %v", e) }, }) if err := cli.Connect(&options); err != nil { log.Printf("MQTT 连接失败: %v", err) return nil, err } return cli, nil } // PubMqttMessage 发送信息 func PubMqttMessage(cli *client.Client, topic string, msg []byte) error { fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg)) opts := &client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte(topic), Message: msg, } if err := cli.Publish(opts); err != nil { log.Printf("MQTT 发布失败: %v", err) return err } fmt.Println("MQTT发送成功!") return nil } // Subscript 订阅信息 func Subscript(cli *client.Client, topic string, msg chan string, isStr string) error { fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05")) fmt.Printf("订阅主题:%s\n", topic) subOpts := &client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte(topic), QoS: mqtt.QoS0, Handler: func(topicName, message []byte) { fmt.Println("MQTT接收到信息:", string(message)) if strings.Contains(strings.ReplaceAll(string(message), " ", ""), isStr) { msg <- topic + "||" + string(message) if _, ok := <-msg; !ok { cli.Disconnect() cli.Terminate() } } }, }, }, } if err := cli.Subscribe(subOpts); err != nil { log.Printf("MQTT 订阅失败: %v", err) return err } return nil }