|
@@ -7,11 +7,11 @@ import (
|
|
"ColdP_server/models/Company"
|
|
"ColdP_server/models/Company"
|
|
"ColdP_server/models/Device"
|
|
"ColdP_server/models/Device"
|
|
"ColdP_server/models/Warning"
|
|
"ColdP_server/models/Warning"
|
|
|
|
+ "context"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
beego "github.com/beego/beego/v2/server/web"
|
|
beego "github.com/beego/beego/v2/server/web"
|
|
"io"
|
|
"io"
|
|
- "io/ioutil"
|
|
|
|
"log"
|
|
"log"
|
|
"math"
|
|
"math"
|
|
"strconv"
|
|
"strconv"
|
|
@@ -80,7 +80,6 @@ func (c *DeviceController) CompanyClass() {
|
|
|
|
|
|
// DataRepeat 数据重传
|
|
// DataRepeat 数据重传
|
|
func (c *DeviceController) DataRepeat() {
|
|
func (c *DeviceController) DataRepeat() {
|
|
-
|
|
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
if !b_ {
|
|
if !b_ {
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
@@ -89,34 +88,45 @@ func (c *DeviceController) DataRepeat() {
|
|
}
|
|
}
|
|
|
|
|
|
t := MqttServer.DataRepeat_C{}
|
|
t := MqttServer.DataRepeat_C{}
|
|
- bytes, _ := ioutil.ReadAll(c.Ctx.Request.Body)
|
|
|
|
|
|
+ bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
json.Unmarshal(bytes, &t)
|
|
json.Unmarshal(bytes, &t)
|
|
fmt.Println("浏览器接收数据:", t)
|
|
fmt.Println("浏览器接收数据:", t)
|
|
s, _ := time.Parse("2006-01-02 15:04:05", t.StartTime)
|
|
s, _ := time.Parse("2006-01-02 15:04:05", t.StartTime)
|
|
e, _ := time.Parse("2006-01-02 15:04:05", t.EndTime)
|
|
e, _ := time.Parse("2006-01-02 15:04:05", t.EndTime)
|
|
|
|
|
|
- //发送MQTT
|
|
|
|
|
|
+ // 发送MQTT
|
|
for k, v := range t.Sns {
|
|
for k, v := range t.Sns {
|
|
topic := fmt.Sprintf("/pub/%s", k)
|
|
topic := fmt.Sprintf("/pub/%s", k)
|
|
repeatPub := MqttServer.DataRepeat_Pub{Sn: k, Type: 9, Mid: time.Now().Unix(), Data: MqttServer.DataRepeat_Pub_Data{Start: s.Unix(), End: e.Unix(), Id: v}}
|
|
repeatPub := MqttServer.DataRepeat_Pub{Sn: k, Type: 9, Mid: time.Now().Unix(), Data: MqttServer.DataRepeat_Pub_Data{Start: s.Unix(), End: e.Unix(), Id: v}}
|
|
msg, _ := json.Marshal(repeatPub)
|
|
msg, _ := json.Marshal(repeatPub)
|
|
mqttId := strings.Split(Device.ReadDeviceMqttId(k), "\"")[0]
|
|
mqttId := strings.Split(Device.ReadDeviceMqttId(k), "\"")[0]
|
|
- client := MqttServer.GetMqttClient(mqttId)
|
|
|
|
|
|
+
|
|
|
|
+ client, err := MqttServer.GetMqttClient(mqttId)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error getting MQTT client for SN: %s, %v", k, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
for i := 0; i < 3; i++ {
|
|
for i := 0; i < 3; i++ {
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
- MqttServer.PubMqttMessage(client, topic, msg)
|
|
|
|
|
|
+ if err := MqttServer.PubMqttMessage(client, topic, msg); err != nil {
|
|
|
|
+ log.Printf("Error publishing MQTT message for SN: %s, %v", k, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 0, Msg: "设置失败", Data: k}
|
|
|
|
+ return
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+
|
|
client.Disconnect()
|
|
client.Disconnect()
|
|
client.Terminate()
|
|
client.Terminate()
|
|
}
|
|
}
|
|
- c.Data["json"] = lib.JSONS{200, "数据重传成功", nil}
|
|
|
|
|
|
+
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 200, Msg: "数据重传成功", Data: nil}
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
// ReadDeviation 读取偏差值
|
|
// ReadDeviation 读取偏差值
|
|
func (c *DeviceController) ReadDeviation() {
|
|
func (c *DeviceController) ReadDeviation() {
|
|
-
|
|
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
if !b_ {
|
|
if !b_ {
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
@@ -125,44 +135,74 @@ func (c *DeviceController) ReadDeviation() {
|
|
}
|
|
}
|
|
|
|
|
|
t := make(map[string][]int)
|
|
t := make(map[string][]int)
|
|
- bytes, _ := ioutil.ReadAll(c.Ctx.Request.Body)
|
|
|
|
- json.Unmarshal(bytes, &t)
|
|
|
|
|
|
+ bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
|
|
+ if err := json.Unmarshal(bytes, &t); err != nil {
|
|
|
|
+ log.Printf("Error unmarshalling JSON: %v", err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
fmt.Println("浏览器接收数据:", t)
|
|
fmt.Println("浏览器接收数据:", t)
|
|
|
|
|
|
- //MQTT发送
|
|
|
|
|
|
+ // MQTT发送
|
|
fmt.Println("发送MQTT t:", t)
|
|
fmt.Println("发送MQTT t:", t)
|
|
deviation := make(chan string, 10)
|
|
deviation := make(chan string, 10)
|
|
var count = 0
|
|
var count = 0
|
|
|
|
+
|
|
for k, v := range t {
|
|
for k, v := range t {
|
|
topicSub := fmt.Sprintf("/sub/%s", k)
|
|
topicSub := fmt.Sprintf("/sub/%s", k)
|
|
- fmt.Println(v)
|
|
|
|
topicPub := fmt.Sprintf("/pub/%s", k)
|
|
topicPub := fmt.Sprintf("/pub/%s", k)
|
|
mqttId := Device.ReadDeviceMqttId(k)
|
|
mqttId := Device.ReadDeviceMqttId(k)
|
|
- client := MqttServer.GetMqttClient(mqttId)
|
|
|
|
- MqttServer.Subscript(client, topicSub, deviation, "\"type\":7,")
|
|
|
|
|
|
+
|
|
|
|
+ client, err := MqttServer.GetMqttClient(mqttId)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error getting MQTT client for SN: %s, %v", k, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 订阅主题
|
|
|
|
+ if err := MqttServer.Subscript(client, topicSub, deviation, "\"type\":7,"); err != nil {
|
|
|
|
+ log.Printf("Error subscribing to MQTT topic for SN: %s, %v", k, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 1, Msg: "mqtt订阅连接失败", Data: nil}
|
|
|
|
+ return
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
pubData, _ := json.Marshal(MqttServer.Deviation_Pub{
|
|
pubData, _ := json.Marshal(MqttServer.Deviation_Pub{
|
|
Sn: k,
|
|
Sn: k,
|
|
Type: 7,
|
|
Type: 7,
|
|
Mid: time.Now().Unix(),
|
|
Mid: time.Now().Unix(),
|
|
Data: v,
|
|
Data: v,
|
|
})
|
|
})
|
|
- MqttServer.PubMqttMessage(client, topicPub, pubData)
|
|
|
|
|
|
+
|
|
|
|
+ if err := MqttServer.PubMqttMessage(client, topicPub, pubData); err != nil {
|
|
|
|
+ log.Printf("Error publishing MQTT message for SN: %s, %v", k, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 1, Msg: "mqtt发送失败", Data: k}
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
count++
|
|
count++
|
|
}
|
|
}
|
|
|
|
+
|
|
deviceRepeatData := make([]string, 0)
|
|
deviceRepeatData := make([]string, 0)
|
|
- select {
|
|
|
|
- case v := <-deviation:
|
|
|
|
- fmt.Println("channel收到数据:", v)
|
|
|
|
- fmt.Println("count:", count)
|
|
|
|
- deviceRepeatData = append(deviceRepeatData, v)
|
|
|
|
- count--
|
|
|
|
- if count <= 0 {
|
|
|
|
- close(deviation)
|
|
|
|
- break
|
|
|
|
|
|
+ timeout := time.After(10 * time.Second) // 设置超时时间
|
|
|
|
+ for count > 0 {
|
|
|
|
+ select {
|
|
|
|
+ case v := <-deviation:
|
|
|
|
+ fmt.Println("channel收到数据:", v)
|
|
|
|
+ fmt.Println("count:", count)
|
|
|
|
+ deviceRepeatData = append(deviceRepeatData, v)
|
|
|
|
+ count--
|
|
|
|
+ case <-timeout:
|
|
|
|
+ log.Println("Timeout waiting for MQTT responses")
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 500, Msg: "Timeout waiting for MQTT responses"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ close(deviation)
|
|
fmt.Println("响应数据:", deviceRepeatData)
|
|
fmt.Println("响应数据:", deviceRepeatData)
|
|
- c.Data["json"] = lib.JSONS{200, "偏差值上传成功", deviceRepeatData}
|
|
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 200, Msg: "偏差值上传成功", Data: deviceRepeatData}
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -175,33 +215,60 @@ func (c *DeviceController) WriteDeviation() {
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+
|
|
bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
- data := make([]MqttServer.Deviation_Sub, 0)
|
|
|
|
fmt.Println("请求json:", string(bytes))
|
|
fmt.Println("请求json:", string(bytes))
|
|
- json.Unmarshal(bytes, &data)
|
|
|
|
|
|
+
|
|
|
|
+ data := make([]MqttServer.Deviation_Sub, 0)
|
|
|
|
+ if err := json.Unmarshal(bytes, &data); err != nil {
|
|
|
|
+ log.Printf("Error unmarshalling JSON: %v", err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 处理每个设备的数据
|
|
for _, v := range data {
|
|
for _, v := range data {
|
|
go func(v MqttServer.Deviation_Sub) {
|
|
go func(v MqttServer.Deviation_Sub) {
|
|
v.Type = 8
|
|
v.Type = 8
|
|
v.Mid = int(time.Now().Unix())
|
|
v.Mid = int(time.Now().Unix())
|
|
- mqttid := Device.ReadDeviceMqttId(v.Sn)
|
|
|
|
- cli := MqttServer.GetMqttClient(mqttid)
|
|
|
|
- bytes, _ := json.Marshal(v)
|
|
|
|
|
|
+ mqttId := Device.ReadDeviceMqttId(v.Sn)
|
|
|
|
+
|
|
|
|
+ client, err := MqttServer.GetMqttClient(mqttId)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ msgBytes, err := json.Marshal(v)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err)
|
|
|
|
+ client.Disconnect()
|
|
|
|
+ client.Terminate()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
for i := 0; i < 3; i++ {
|
|
for i := 0; i < 3; i++ {
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
- MqttServer.PubMqttMessage(cli, fmt.Sprintf("/pub/%s", v.Sn), bytes)
|
|
|
|
|
|
+ if err := MqttServer.PubMqttMessage(client, fmt.Sprintf("/pub/%s", v.Sn), msgBytes); err != nil {
|
|
|
|
+ log.Printf("Error publishing MQTT message for SN: %s, %v", v.Sn, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 0, Msg: "设置失败", Data: v.Sn}
|
|
|
|
+ return
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- cli.Disconnect()
|
|
|
|
- cli.Terminate()
|
|
|
|
|
|
+
|
|
|
|
+ client.Disconnect()
|
|
|
|
+ client.Terminate()
|
|
}(v)
|
|
}(v)
|
|
}
|
|
}
|
|
- c.Data["json"] = lib.JSONS{200, "设置偏差值成功!", nil}
|
|
|
|
|
|
+
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil}
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
// ReadSensor 读取偏差值
|
|
// ReadSensor 读取偏差值
|
|
func (c *DeviceController) ReadSensor() {
|
|
func (c *DeviceController) ReadSensor() {
|
|
-
|
|
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
if !b_ {
|
|
if !b_ {
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"}
|
|
@@ -210,51 +277,78 @@ func (c *DeviceController) ReadSensor() {
|
|
}
|
|
}
|
|
|
|
|
|
t := make(map[string][]int)
|
|
t := make(map[string][]int)
|
|
- bytes, _ := ioutil.ReadAll(c.Ctx.Request.Body)
|
|
|
|
- json.Unmarshal(bytes, &t)
|
|
|
|
|
|
+ bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
|
|
+ if err := json.Unmarshal(bytes, &t); err != nil {
|
|
|
|
+ log.Printf("Error unmarshalling JSON: %v", err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
fmt.Println("浏览器接收数据:", t)
|
|
fmt.Println("浏览器接收数据:", t)
|
|
|
|
|
|
- //MQTT发送
|
|
|
|
|
|
+ // MQTT发送
|
|
fmt.Println("发送MQTT t:", t)
|
|
fmt.Println("发送MQTT t:", t)
|
|
deviation := make(chan string, 10)
|
|
deviation := make(chan string, 10)
|
|
var count = 0
|
|
var count = 0
|
|
|
|
+
|
|
for k, v := range t {
|
|
for k, v := range t {
|
|
topicSub := fmt.Sprintf("/sub/%s", k)
|
|
topicSub := fmt.Sprintf("/sub/%s", k)
|
|
- fmt.Println(v)
|
|
|
|
topicPub := fmt.Sprintf("/pub/%s", k)
|
|
topicPub := fmt.Sprintf("/pub/%s", k)
|
|
mqttId := Device.ReadDeviceMqttId(k)
|
|
mqttId := Device.ReadDeviceMqttId(k)
|
|
- client := MqttServer.GetMqttClient(mqttId)
|
|
|
|
- MqttServer.Subscript(client, topicSub, deviation, "\"type\":5,")
|
|
|
|
|
|
+
|
|
|
|
+ client, err := MqttServer.GetMqttClient(mqttId)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error getting MQTT client for SN: %s, %v", k, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 订阅主题
|
|
|
|
+ if err := MqttServer.Subscript(client, topicSub, deviation, "\"type\":5,"); err != nil {
|
|
|
|
+ log.Printf("Error subscribing to MQTT topic for SN: %s, %v", k, err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
pubData, _ := json.Marshal(MqttServer.Deviation_Pub{
|
|
pubData, _ := json.Marshal(MqttServer.Deviation_Pub{
|
|
Sn: k,
|
|
Sn: k,
|
|
Type: 5,
|
|
Type: 5,
|
|
Mid: time.Now().Unix(),
|
|
Mid: time.Now().Unix(),
|
|
Data: v,
|
|
Data: v,
|
|
})
|
|
})
|
|
- MqttServer.PubMqttMessage(client, topicPub, pubData)
|
|
|
|
|
|
+
|
|
|
|
+ if err := MqttServer.PubMqttMessage(client, topicPub, pubData); err != nil {
|
|
|
|
+ log.Printf("Error publishing MQTT message for SN: %s, %v", k, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 1, Msg: "mqtt发送失败", Data: k}
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
count++
|
|
count++
|
|
}
|
|
}
|
|
|
|
+
|
|
deviceRepeatData := make([]string, 0)
|
|
deviceRepeatData := make([]string, 0)
|
|
- select {
|
|
|
|
- case v := <-deviation:
|
|
|
|
- fmt.Println("channel收到数据:", v)
|
|
|
|
- fmt.Println("count:", count)
|
|
|
|
-
|
|
|
|
- deviceRepeatData = append(deviceRepeatData, v)
|
|
|
|
- count--
|
|
|
|
- if count <= 0 {
|
|
|
|
- close(deviation)
|
|
|
|
- break
|
|
|
|
|
|
+ timeout := time.After(10 * time.Second) // 设置超时时间
|
|
|
|
+ for count > 0 {
|
|
|
|
+ select {
|
|
|
|
+ case v := <-deviation:
|
|
|
|
+ fmt.Println("channel收到数据:", v)
|
|
|
|
+ fmt.Println("count:", count)
|
|
|
|
+ deviceRepeatData = append(deviceRepeatData, v)
|
|
|
|
+ count--
|
|
|
|
+ case <-timeout:
|
|
|
|
+ log.Println("Timeout waiting for MQTT responses")
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 500, Msg: "Timeout waiting for MQTT responses"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ close(deviation)
|
|
fmt.Println("响应数据:", deviceRepeatData)
|
|
fmt.Println("响应数据:", deviceRepeatData)
|
|
- c.Data["json"] = lib.JSONS{200, "偏差值上传成功", deviceRepeatData}
|
|
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 200, Msg: "偏差值上传成功", Data: deviceRepeatData}
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
-// WriteSensor WriteDeviation 设置偏差值
|
|
|
|
|
|
+// WriteSensor 设置偏差值
|
|
func (c *DeviceController) WriteSensor() {
|
|
func (c *DeviceController) WriteSensor() {
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey"))
|
|
if !b_ {
|
|
if !b_ {
|
|
@@ -262,26 +356,80 @@ func (c *DeviceController) WriteSensor() {
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- bytes, _ := io.ReadAll(c.Ctx.Request.Body)
|
|
|
|
- data := make([]MqttServer.Sensor_Sub, 0)
|
|
|
|
|
|
+
|
|
|
|
+ bytes, err := io.ReadAll(c.Ctx.Request.Body)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error reading request body: %v", err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 500, Msg: "Internal Server Error"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
fmt.Println("请求json:", string(bytes))
|
|
fmt.Println("请求json:", string(bytes))
|
|
- json.Unmarshal(bytes, &data)
|
|
|
|
|
|
+
|
|
|
|
+ data := make([]MqttServer.Sensor_Sub, 0)
|
|
|
|
+ if err := json.Unmarshal(bytes, &data); err != nil {
|
|
|
|
+ log.Printf("Error unmarshalling JSON: %v", err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"}
|
|
|
|
+ c.ServeJSON()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 设置上下文超时
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
|
|
+ defer cancel()
|
|
|
|
+
|
|
|
|
+ // 处理每个设备的数据
|
|
for _, v := range data {
|
|
for _, v := range data {
|
|
go func(v MqttServer.Sensor_Sub) {
|
|
go func(v MqttServer.Sensor_Sub) {
|
|
|
|
+ select {
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
+ log.Printf("Context canceled or timeout for SN: %s", v.Sn)
|
|
|
|
+ return
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+
|
|
v.Type = 6
|
|
v.Type = 6
|
|
v.Mid = int(time.Now().Unix())
|
|
v.Mid = int(time.Now().Unix())
|
|
- mqttid := Device.ReadDeviceMqttId(v.Sn)
|
|
|
|
- cli := MqttServer.GetMqttClient(mqttid)
|
|
|
|
- bytes, _ := json.Marshal(v)
|
|
|
|
|
|
+ mqttId := Device.ReadDeviceMqttId(v.Sn)
|
|
|
|
+
|
|
|
|
+ client, err := MqttServer.GetMqttClient(mqttId)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ msgBytes, err := json.Marshal(v)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err)
|
|
|
|
+ client.Disconnect()
|
|
|
|
+ client.Terminate()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
for i := 0; i < 3; i++ {
|
|
for i := 0; i < 3; i++ {
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
time.Sleep(time.Second * time.Duration(i+1))
|
|
- MqttServer.PubMqttMessage(cli, fmt.Sprintf("/pub/%s", v.Sn), bytes)
|
|
|
|
|
|
+ select {
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
+ log.Printf("Context canceled or timeout for SN: %s", v.Sn)
|
|
|
|
+ client.Disconnect()
|
|
|
|
+ client.Terminate()
|
|
|
|
+ return
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err := MqttServer.PubMqttMessage(client, fmt.Sprintf("/pub/%s", v.Sn), msgBytes); err != nil {
|
|
|
|
+ log.Printf("Error publishing MQTT message for SN: %s, %v", v.Sn, err)
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 0, Msg: "设置失败", Data: v.Sn}
|
|
|
|
+ return
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- cli.Disconnect()
|
|
|
|
- cli.Terminate()
|
|
|
|
|
|
+
|
|
|
|
+ client.Disconnect()
|
|
|
|
+ client.Terminate()
|
|
}(v)
|
|
}(v)
|
|
}
|
|
}
|
|
- c.Data["json"] = lib.JSONS{200, "设置偏差值成功!", nil}
|
|
|
|
|
|
+
|
|
|
|
+ c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil}
|
|
c.ServeJSON()
|
|
c.ServeJSON()
|
|
return
|
|
return
|
|
}
|
|
}
|