123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package MqttServer
- import (
- "fmt"
- beego "github.com/beego/beego/v2/server/web"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "log"
- "math/rand"
- "strings"
- "time"
- )
- var (
- mqttSuffix string
- mqttPort string
- mqttUsername string
- mqttPassword string
- )
- func init() {
- var err error
- mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_suffix: %v", err)
- }
- mqttPort, err = beego.AppConfig.String("Mqtt_port")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_port: %v", err)
- }
- mqttUsername, err = beego.AppConfig.String("Mqtt_username")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_username: %v", err)
- }
- mqttPassword, err = beego.AppConfig.String("Mqtt_password")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_password: %v", err)
- }
- }
- // GetMqttClient 获取MQTT客户端连接
- func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) {
- r := rand.New(rand.NewSource(time.Now().Unix()))
- options := client.ConnectOptions{
- Network: "tcp",
- Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort),
- UserName: []byte(mqttUsername),
- Password: []byte(mqttPassword),
- ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))),
- }
- cli := client.New(&client.Options{
- ErrorHandler: func(e error) {
- log.Printf("MQTT Error: %v", e)
- },
- })
- if err := cli.Connect(&options); err != nil {
- log.Printf("MQTT 连接失败: %v", err)
- return nil, err
- }
- return cli, nil
- }
- // PubMqttMessage 发送信息
- func PubMqttMessage(cli *client.Client, topic string, msg []byte) error {
- fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg))
- opts := &client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte(topic),
- Message: msg,
- }
- if err := cli.Publish(opts); err != nil {
- log.Printf("MQTT 发布失败: %v", err)
- return err
- }
- fmt.Println("MQTT发送成功!")
- return nil
- }
- // Subscript 订阅信息
- func Subscript(cli *client.Client, topic string, msg chan string, isStr string) error {
- fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05"))
- fmt.Printf("订阅主题:%s\n", topic)
- subOpts := &client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte(topic),
- QoS: mqtt.QoS0,
- Handler: func(topicName, message []byte) {
- fmt.Println("MQTT接收到信息:", string(message))
- if strings.Contains(strings.ReplaceAll(string(message), " ", ""), isStr) {
- msg <- topic + "||" + string(message)
- if _, ok := <-msg; !ok {
- cli.Disconnect()
- cli.Terminate()
- }
- }
- },
- },
- },
- }
- if err := cli.Subscribe(subOpts); err != nil {
- log.Printf("MQTT 订阅失败: %v", err)
- return err
- }
- return nil
- }
|