package MqttServer import ( "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "log" "math/rand" "strings" "time" ) var ( mqtt_suffix, mqtt_port, mqtt_username, mqtt_password string err error ) func init() { mqtt_suffix, err = beego.AppConfig.String("Mqtt_suffix") mqtt_port, err = beego.AppConfig.String("Mqtt_port") mqtt_username, err = beego.AppConfig.String("Mqtt_username") mqtt_password, err = beego.AppConfig.String("Mqtt_password") } // GetConnectionOptions 获取链接参数 func GetMqttClient(mqttAddrPrefix string) *client.Client { r := rand.New(rand.NewSource(time.Now().Unix())) options := client.ConnectOptions{ Network: "tcp", Address: fmt.Sprintf("%s.coldbaozhida.com:1883", mqttAddrPrefix), UserName: []byte(mqtt_username), Password: []byte(mqtt_password), ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))), } cli := client.New(&client.Options{ErrorHandler: func(e error) { fmt.Println("PubMqtt Error 发布信息错误:", e.Error()) }}) err = cli.Connect(&options) if err != nil { fmt.Println("MQTT 连接失败", err.Error()) } return cli } // PubMqttMessage 发送信息 func PubMqttMessage(cli *client.Client, topic string, msg []byte) { fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg)) time.Sleep(time.Second) err = cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte(topic), Message: msg, }) if err != nil { log.Panicln("Mqtt发布失败", err.Error()) return } fmt.Println("MQTT发送成功!") } // Subscript 订阅信息 func Subscript(cli *client.Client, topic string, msg chan string, is_str string) { fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05")) fmt.Printf("订阅主题:%s\n", topic) cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte(topic), QoS: mqtt.QoS0, Handler: func(topicName, message []byte) { //如果管道已经关闭,表示已经超时 fmt.Println("MQTT接收到信息:", string(message)) if strings.Contains(strings.Replace(string(message), " ", "", -1), is_str) { msg <- topic + "||" + string(message) if _, ok := <-msg; !ok { cli.Disconnect() cli.Terminate() } } }, }, }, }) }