Преглед изворни кода

异常离线“BX100W”设备改成“可能市电断电,请注意查看”

siked пре 1 година
родитељ
комит
50108ca47d

+ 7 - 2
MqttServer/MqttClients.go

@@ -150,15 +150,20 @@ func MessageDisconnected(topicName string, message []byte) {
 		Reason = "主网络|" + Reason
 	}
 
+	if !strings.Contains(Reason, "主动断开") && r_Device.T_model == "BX100W" {
+		Reason = "可能市电断电,请注意查看"
+	}
+
 	Warning_r := Warning.Add_DeviceLogs(1001, r_Device, Reason)
 	Warning_r.T_fUt = time.Now()
-	if !strings.Contains(Reason, "主动断开") {
+	if !strings.Contains(Reason, "主动断开") { // 异常断开
 		//WarningNotice.WarningToAdminId(&Warning_r, []int{}) // 发送给 绑定公司管理员
 		//// 报警处理
 		//Warning.Add_Warning_Log(&Warning_r, "====== 报警策略 ===== \n")
 		WarningNotice.WarningCompanyNotice(&Warning_r, 0, 0)
+	} else { // 正常关机
+		r_Device.T_monitor = 0
 	}
-	r_Device.T_monitor = 0
 
 	// 同步参数
 	Device.Update_Device(r_Device, "T_online", "T_online_s", "T_monitor")

+ 53 - 13
MqttServer/V3MqttHandle.go

@@ -129,6 +129,30 @@ func AsyncFuncV3(r_Device *Device.Device, Ms_project Ms2m_Project, msg []byte) {
 				T_time: UT,
 				T_sp:   DeviceSensorParameter_r.Id,
 			}
+
+			// 验证 数据是否安装规定上传间隔
+			DeviceData_Previous_r, is := Device.RedisDeviceData_Get(r_Device.T_sn, DeviceSensor_r.T_id)
+			if is {
+				DeviceParameter_r, is := Device.Read_DeviceParameter(r_Device.T_sn)
+				if is {
+					// 计算时间差
+					duration := DeviceData_Previous_r.T_time.Sub(DeviceData_t.T_time)
+					// 将时间差表示为秒
+					seconds := int(math.Abs(duration.Seconds()))
+					// 记录间隔 超过 保存间隔(+-10s)范围
+					difference := int(math.Abs(float64(seconds - DeviceParameter_r.T_overA)))
+					if difference > 10 {
+						logs.Println(r_Device.T_sn+" 没有达到数据保存间隔,间隔:", seconds, DeviceParameter_r.T_overA, " 时间差 > 10", difference)
+
+						if F_T_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) && F_H_Exceeding(r_Device, DeviceData_t, DeviceSensorParameter_r) {
+							logs.Println(r_Device.T_sn + " 跳过记录数据")
+							continue
+						}
+					}
+				}
+
+			}
+
 			if r_, DeviceData_old_r := Device.Add_DeviceData(r_Device.T_sn, DeviceSensor_r.T_id, DeviceData_t); r_ {
 				// 被替换
 				Device.Add_DeviceDataOld(Device.DeviceDataOld{
@@ -245,19 +269,16 @@ func AsyncFuncV3(r_Device *Device.Device, Ms_project Ms2m_Project, msg []byte) {
 					continue
 				}
 				Warningr := Warning.Read_T_snT_idT_tp(Warning_r.T_sn, Warning_r.T_id, Warning_r.T_tp)
-				if Warningr.Id == 0 {
-					Warningr.T_Ut = time.Now()
-				}
-
-				// 计算时间差
-				duration := time.Now().Sub(Warningr.T_Ut)
-
-				// 将时间差表示为秒
-				seconds := int(duration.Seconds())
-
-				if seconds < (DeviceParameter_r.T_outA - 3) {
-					logs.Println(r_Device.T_sn+" 没有达到设置报警间隔,跳过,间隔:", DeviceParameter_r.T_outA, " 时间差", seconds)
-					continue
+				if Warningr.Id != 0 {
+					// 计算时间差
+					duration := time.Now().Sub(Warningr.T_Ut)
+
+					// 将时间差表示为秒
+					seconds := int(duration.Seconds())
+					if seconds < (DeviceParameter_r.T_outA - 3) {
+						logs.Println(r_Device.T_sn+" 没有达到设置报警间隔,跳过,间隔:", DeviceParameter_r.T_outA, " 时间差", seconds)
+						continue
+					}
 				}
 
 			}
@@ -857,3 +878,22 @@ func AsyncFuncV3(r_Device *Device.Device, Ms_project Ms2m_Project, msg []byte) {
 	Mqtt_publish(r_Device.T_sn, string(jsonStu))
 
 }
+
+func F_T_Exceeding(r_Device *Device.Device, DeviceData_t Device.DeviceData_R, DeviceSensorParameter_r Device.DeviceSensorParameter) bool {
+	if r_Device.T_ist == 1 { // 温度   1开启   2关闭
+		if !(DeviceSensorParameter_r.T_Tupper < DeviceData_t.T_t || DeviceData_t.T_t < DeviceSensorParameter_r.T_Tlower) {
+			logs.Println(r_Device.T_sn+" 温度没有超标,且不在保存间隔范围 :", DeviceSensorParameter_r.T_Tupper, DeviceData_t.T_t, DeviceSensorParameter_r.T_Tlower)
+			return true
+		}
+	}
+	return false
+}
+func F_H_Exceeding(r_Device *Device.Device, DeviceData_t Device.DeviceData_R, DeviceSensorParameter_r Device.DeviceSensorParameter) bool {
+	if r_Device.T_ish == 1 { // 湿度   1开启   2关闭
+		if !(DeviceSensorParameter_r.T_RHupper < DeviceData_t.T_rh || DeviceData_t.T_rh < DeviceSensorParameter_r.T_RHlower) {
+			logs.Println(r_Device.T_sn+" 湿度没有超标,且不在保存间隔范围 :", DeviceSensorParameter_r.T_RHupper, DeviceData_t.T_rh, DeviceSensorParameter_r.T_RHlower)
+			return true
+		}
+	}
+	return false
+}

+ 6 - 6
MqttServer/WarningNotice/WarningNotice.go

@@ -367,13 +367,13 @@ func WarningCompanyNotice_CompanyNotice_Send(Warning_r *Warning.Warning, Company
 						continue
 					}
 					// 语音实体
-					type V_struct struct {
-						Corporate string `json:"company"`
-						Device    string `json:"device"`
-						Name      string `json:"name"`
-					}
-					Hw_VoiceNotice_is := NatsServer.Hw_VoiceNotice(Company_Charging_r, Warning_r, v_c_l[0], []byte(fmt.Sprintf(`{"displayNbr":"+862022509874","calleeNbr":"%s","playInfoList":[{"templateId":"f70c83b230dd4794860a383c38abdefa","templateParas":["%s"]}],"userData":"testUserData"}`, v_c_l[0], Warning.Read_WarningType_Get(Warning_r.T_tp))))
+					Hw_VoiceNotice_is := NatsServer.Hw_VoiceNotice(Company_Charging_r, Warning_r, v_c_l[0], []byte(fmt.Sprintf(`{"displayNbr":"+862022509874","calleeNbr":"%s","playInfoList":[{"templateId":"03bf51a5d8094db8ab1001aaa426ac60","templateParas":["%s","%s","%s"]}],"userData":"testUserData"}`, v_c_l[0], lib.Limit_len(Company_r.T_name, 29), lib.Limit_len(Warning_r.T_DS_name, 29), Warning.Read_WarningType_Get(Warning_r.T_tp))))
 					if Hw_VoiceNotice_is {
+						type V_struct struct {
+							Corporate string `json:"company"`
+							Device    string `json:"device"`
+							Name      string `json:"name"`
+						}
 						Warning.Add_Warning_Log(Warning_r, "--!电话-> 华为云 通知失败,切换为 赛邮云"+"\n")
 						V_struct_r := V_struct{Corporate: Company_r.T_name, Device: Warning_r.T_DS_name, Name: Warning.Read_WarningType_Get(Warning_r.T_tp)}
 						V_struct_str, _ := json.Marshal(V_struct_r)

+ 1 - 2
lib/MinuteToDataTime.go

@@ -1,7 +1,6 @@
 package lib
 
 import (
-	"fmt"
 	"strconv"
 	"time"
 )
@@ -69,7 +68,7 @@ func TimeSinceToString(startTime, endTime time.Time) string {
 	minutes := int(duration.Minutes()) % 60
 	seconds := int(duration.Seconds()) % 60
 
-	fmt.Printf("时间差为 %d 天 %d 小时 %d 分钟\n", days, hours, minutes)
+	//fmt.Printf("时间差为 %d 天 %d 小时 %d 分钟\n", days, hours, minutes)
 	if days > 0 {
 		str += strconv.Itoa(days) + "天 "
 	}

+ 5 - 3
main.go

@@ -1,6 +1,8 @@
 package main
 
 import (
+	"Cold_mqtt/MqttServer"
+	"Cold_mqtt/Nats"
 	"Cold_mqtt/Plugins"
 	"Cold_mqtt/TimeTask"
 	"Cold_mqtt/conf"
@@ -32,10 +34,10 @@ func main() {
 	beego.BConfig.Listen.HTTPPort = conf.HTTPPort                               //监听端口  本地:8518  线上:8528
 
 	logs.Println("MqttServer", " ======= 项目启动 ========")
-	//go MqttServer.Run_MqttServer() // MQTT 通讯
+	go MqttServer.Run_MqttServer() // MQTT 通讯
 	////go MqttServer.MqttPolling() // MQTT 设备轮询
-	//go MqttServer.DeviceMqttMap_go()                  //// 缓存数据发送-确保设备在休眠后 能收到数据
-	//go Nats.NatsInit()                                // Nats 通讯
+	go MqttServer.DeviceMqttMap_go()                  //// 缓存数据发送-确保设备在休眠后 能收到数据
+	go Nats.NatsInit()                                // Nats 通讯
 	go TimeTask.Init()                                // 时间任务
 	go Plugins.ContinuousPushOfOfflineDeviceWarning() //设备离线报警连续推送
 

+ 3 - 2
models/Device/Device.go

@@ -189,8 +189,9 @@ func Read_获取BX100W离线设备() []Device {
 
 	qs := o.QueryTable(new(Device))
 	var r []Device
-	//qs.Filter("T_mqttid", conf.MqttServer_id).Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 0).All(&r)
-	qs.Filter("T_pid", 242).Filter("T_State", 1).Exclude("T_online", 1).All(&r)
+	qs.Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
+	//qs.Filter("T_mqttid", conf.MqttServer_id).Filter("T_model", "BX100W").Filter("T_State", 1).Filter("T_online", 2).All(&r)
+	//qs.Filter("T_pid", 242).Filter("T_State", 1).Exclude("T_online", 1).All(&r)
 
 	return r
 }

+ 2 - 1
models/Device/DeviceData.go

@@ -79,7 +79,8 @@ func RedisDeviceData_Set(T_sn string, T_id int, r DeviceData_R) (err error) {
 	return
 }
 
-func RedisDeviceData_Get(key string) (r DeviceData_R, is bool) {
+func RedisDeviceData_Get(T_sn string, T_id int) (r DeviceData_R, is bool) {
+	key := T_sn + "|" + strconv.Itoa(T_id)
 	if redis_DeviceData.IsExist(key) {
 		v := redis_DeviceData.Get(key)
 		json.Unmarshal(v.([]byte), &r)

+ 120 - 0
tests/device/MqttServer.go

@@ -0,0 +1,120 @@
+package MqttServer
+
+import (
+	"fmt"
+	"github.com/yosssi/gmq/mqtt"
+	"github.com/yosssi/gmq/mqtt/client"
+	"time"
+)
+
+var (
+	cli    *client.Client
+	T_sn   string
+	T_pass string
+)
+
+func init() {
+	T_sn = "2024168563127390"
+	T_pass = "qMua4PZyl2vwchNL"
+	Run_MqttServer()
+}
+
+/*
+{
+	"type": 203,
+	"data": {
+		"sn": "2024666666666666",
+		"mqtt1": "mqttyuht.coldbaozhida.com",
+		"mqtt2": "mqttlodr.coldbaozhida.com",
+		"mqtt3": "mqttyuht.coldbaozhida.com",
+		"mqtt4": "",
+		"mqtt5": "",
+		"port": 1883,
+		"user": "2024666666666666",
+		"pass": "qMua4PZyl2vwchNL"
+	}
+}
+*/
+
+func Run_MqttServer() {
+	time.Sleep(3 * time.Second)
+	fmt.Println("============Run_MqttServer=============", "")
+
+	// Create an MQTT Client.
+	cli = client.New(&client.Options{
+		// Define the processing of the error handler.
+		ErrorHandler: func(err error) {
+			fmt.Println("err!!!!!! Run_MqttServer:", err.Error())
+
+			time.Sleep(3 * time.Second)
+			go Run_MqttServer() // MQTT 通讯
+			return
+		},
+	})
+
+	// Terminate the Client.
+	defer cli.Terminate()
+
+	c := client.ConnectOptions{
+		Network:  "tcp",
+		Address:  "mqttyuht.coldbaozhida.com",
+		ClientID: []byte(T_sn + "_test"),
+		UserName: []byte(T_sn),
+		Password: []byte(T_pass),
+	}
+	fmt.Println("Address:", c.Address)
+	fmt.Println("ClientID:", string(c.ClientID))
+
+	// Connect to the MQTT Server.
+	err := cli.Connect(&c)
+
+	if err != nil {
+		fmt.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
+		fmt.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
+		fmt.Println("err!!!!!! 连接MQTT失败:", err)
+		cli.Terminate()
+		time.Sleep(3 * time.Second)
+		go Run_MqttServer() // MQTT 通讯
+
+		return
+	}
+
+	// Subscribe to topics.
+	err = cli.Subscribe(&client.SubscribeOptions{
+		SubReqs: []*client.SubReq{
+			&client.SubReq{
+				TopicFilter: []byte("/pub/2024168563127390"),
+				QoS:         mqtt.QoS0,
+				// Define the processing of the message handler.
+				Handler: func(topicName, message []byte) {
+					fmt.Println(">-" + string(topicName) + " " + string(message))
+
+				},
+			},
+		},
+	})
+	if err != nil {
+		fmt.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
+		fmt.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
+
+	}
+
+	fmt.Println("MQTT ok!")
+}
+
+// 发送数据
+func Mqtt_publish(topic string, text string) {
+
+	// Publish a message.
+	err := cli.Publish(&client.PublishOptions{
+		QoS:       mqtt.QoS0,
+		TopicName: []byte("/pub/" + topic),
+		Message:   []byte(text),
+	})
+
+	fmt.Println("-> /pub/" + topic + " " + text)
+	if err != nil {
+		fmt.Println("MqttServer", "发送消息失败 [cli.Publish]", text)
+	}
+
+}