123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package MqttServer
- import (
- "fmt"
- beego "github.com/beego/beego/v2/server/web"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "log"
- "math/rand"
- "strings"
- "time"
- )
- var (
- mqtt_suffix,
- mqtt_port,
- mqtt_username,
- mqtt_password string
- err error
- )
- func init() {
- mqtt_suffix, err = beego.AppConfig.String("Mqtt_suffix")
- mqtt_port, err = beego.AppConfig.String("Mqtt_port")
- mqtt_username, err = beego.AppConfig.String("Mqtt_username")
- mqtt_password, err = beego.AppConfig.String("Mqtt_password")
- }
- // GetConnectionOptions 获取链接参数
- func GetMqttClient(mqttAddrPrefix string) *client.Client {
- r := rand.New(rand.NewSource(time.Now().Unix()))
- options := client.ConnectOptions{
- Network: "tcp",
- Address: fmt.Sprintf("%s.coldbaozhida.com:1883", mqttAddrPrefix),
- UserName: []byte(mqtt_username),
- Password: []byte(mqtt_password),
- ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))),
- }
- cli := client.New(&client.Options{ErrorHandler: func(e error) {
- fmt.Println("PubMqtt Error 发布信息错误:", e.Error())
- }})
- err = cli.Connect(&options)
- if err != nil {
- fmt.Println("MQTT 连接失败", err.Error())
- }
- return cli
- }
- // PubMqttMessage 发送信息
- func PubMqttMessage(cli *client.Client, topic string, msg []byte) {
- fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg))
- time.Sleep(time.Second)
- err = cli.Publish(&client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte(topic),
- Message: msg,
- })
- if err != nil {
- log.Panicln("Mqtt发布失败", err.Error())
- return
- }
- fmt.Println("MQTT发送成功!")
- }
- // Subscript 订阅信息
- func Subscript(cli *client.Client, topic string, msg chan string, is_str string) {
- fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05"))
- fmt.Printf("订阅主题:%s\n", topic)
- cli.Subscribe(&client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte(topic),
- QoS: mqtt.QoS0,
- Handler: func(topicName, message []byte) {
- //如果管道已经关闭,表示已经超时
- fmt.Println("MQTT接收到信息:", string(message))
- if strings.Contains(strings.Replace(string(message), " ", "", -1), is_str) {
- msg <- topic + "||" + string(message)
- if _, ok := <-msg; !ok {
- cli.Disconnect()
- cli.Terminate()
- }
- }
- },
- },
- },
- })
- }
|