package controllers // //import ( // "ColdP_server/conf" // "ColdP_server/controllers/MqttServer" // "ColdP_server/controllers/lib" // "ColdP_server/logs" // "ColdP_server/models/Company" // "ColdP_server/models/Device" // "ColdP_server/models/Warning" // "context" // "encoding/json" // "fmt" // beego "github.com/beego/beego/v2/server/web" // "io" // "log" // "math" // "strconv" // "strings" // "sync" // "time" //) // //// DeviceController 设备管理页面 //type DeviceController struct { // beego.Controller //} // //// DeviceManagerHtml 返回页面 //func (c *DeviceController) DeviceManagerHtml() { // //验证是否登录 // is, admin := lib.VerificationController(&c.Controller) // if !is { // return // } // // //类名列表 // classList := Company.Read_CompanyClass_All(admin.T_pid, "") // c.Data["Class_List"] = classList // c.TplName = "Device/Device.html" //} // //func (c *DeviceController) DeviceList() { // tName := c.GetString("deviceName") // 设备名称 // page, _ := c.GetInt64("currentPage") // 当前页码 // tClassify, _ := c.GetInt("deviceClass") // 设备分类 // // is, admin := lib.VerificationController(&c.Controller) // fmt.Println("当前用户的PID为:", admin.T_pid) // if !is { // // 用户未登录 // c.Data["json"] = lib.JSONS{202, "身份认证失效", nil} // c.ServeJSON() // return // } // // device_list, count := Device.Read_DeviceSensor_List_T_ClassOr(admin.T_pid, tClassify, tName, tName, -1, int(page), conf.Page_size) // var pageCount int // if (int(count) % conf.Page_size) != 0 { // pageCount = int(count) / conf.Page_size // pageCount++ // } // // var wg sync.WaitGroup // for i := range device_list { // wg.Add(1) // go func(deviceList *Device.DeviceSensor_) { // defer func() { // if r := recover(); r != nil { // logs.Debug("协程出现异常: %v", r) // } // }() // defer wg.Done() // // t := Device.Read_DeviceData(deviceList.T_sn, deviceList.T_id) // err2, m := Device.ReadDeviceSensorParameter(deviceList.T_Sp) // if err2 != nil { // logs.Error("Error getting device sensor parameter for SN: %s, %v", deviceList.T_sn, err2) // } // topicSub := fmt.Sprintf("/sub/%s", deviceList.T_sn) // topicPub := fmt.Sprintf("/pub/%s", deviceList.T_sn) // mqttId := Device.ReadDeviceMqttId(deviceList.T_sn) // cli, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // logs.Error("Error getting MQTT client for SN: %s, %v", deviceList.T_sn, err) // return // } // defer cli.Disconnect() // defer cli.Terminate() // // deviceList.T_tDeviation = 1201 // deviceList.T_RhDeviation = 1201 // // err = cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS0, // Handler: func(topicName, message []byte) { // log.Println("接收到数据", string(message)) // var devi Device.Deviation // err = json.Unmarshal(message, &devi) // if err != nil { // logs.Error("json序列化失败 SN: %s, %v", deviceList.T_sn, err) // } else { // if len(devi.Data) > 0 { // deviceList.T_tDeviation = float32(devi.Data[0].T) // deviceList.T_RhDeviation = float32(devi.Data[0].H) // } // } // }, // }, // }, // }) // if err != nil { // logs.Error("Error subscribing to topic for SN: %s, %v", deviceList.T_sn, err) // return // } // // ints := []int{deviceList.T_id} // pubData, err := json.Marshal(MqttServer.Deviation_Pub{ // Sn: deviceList.T_sn, // Type: 7, // Mid: time.Now().Unix(), // Data: ints, // }) // if err != nil { // logs.Error("Error marshalling JSON for SN: %s, %v", deviceList.T_sn, err) // return // } // fmt.Println("发送数据:", string(pubData)) // if err := MqttServer.PubMqttMessage(cli, topicPub, pubData); err != nil { // logs.Error("Error publishing MQTT message for SN: %s, %v", deviceList.T_sn, err) // return // } // // time.Sleep(2 * time.Second) // deviceList.T_Tlower = m.T_Tlower // deviceList.T_Tupper = m.T_Tupper // deviceList.T_RHlower = m.T_RHlower // deviceList.T_RHupper = m.T_RHupper // deviceList.T_t = t.T_t // deviceList.T_rh = t.T_rh // }(&device_list[i]) // } // // wg.Wait() // c.Data["json"] = lib.PageHelper{int(count), pageCount, int(page), int(page) >= pageCount, page <= 1, device_list} // c.ServeJSON() // return //} // ////func (c *DeviceController) SetDeviceDataZero() { //// is, admin := lib.VerificationController(&c.Controller) //// if !is { //// c.Data["json"] = lib.JSONS{Code: 202, Msg: "用户未登录", Data: nil} //// c.ServeJSON() //// return //// } //// //// id, err := Device.Read_DeviceSensor_ALL_T_sn_T_id(admin.T_pid) //// if err != nil { //// c.Data["json"] = lib.JSONS{Code: 201, Msg: "获取设备列表失败", Data: nil} //// c.ServeJSON() //// return //// } //// //// data := make([]MqttServer.Deviation_Sub, 0) //// for _, v := range id { //// data = append(data, MqttServer.Deviation_Sub{ //// Sn: v.T_sn, //// Data: []MqttServer.Deviation_Sub_Data{ //// {Id: v.T_id, H: 0, T: 0}, //// }, //// }) //// } //// //// var wg sync.WaitGroup //// successfulSns := make(map[string]bool) // 记录成功的SN //// mutex := &sync.Mutex{} // 保护对successfulSns的访问 //// responseChan := make(chan string, len(data)) //// failSn := make([]string, 0) //// defer close(responseChan) //// //// handleResult := func(sn string, success bool) { //// mutex.Lock() //// defer mutex.Unlock() //// if success { //// successfulSns[sn] = true //// } else { //// failSn = append(failSn, sn) //// } //// } //// //// for _, v := range data { //// wg.Add(1) //// go func(v MqttServer.Deviation_Sub) { //// defer wg.Done() //// v.Type = 8 //// v.Mid = int(time.Now().Unix()) //// mqttId := Device.ReadDeviceMqttId(v.Sn) //// client, err := MqttServer.GetMqttClient(mqttId) //// if err != nil { //// log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err) //// handleResult(v.Sn, false) //// return //// } //// defer client.Disconnect() //// defer client.Terminate() //// msgBytes, err := json.Marshal(v) //// if err != nil { //// log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err) //// handleResult(v.Sn, false) //// return //// } //// //// subTopic := fmt.Sprintf("/sub/%s", v.Sn) //// err = MqttServer.Subscript(client, subTopic, responseChan, nil) //// if err != nil { //// log.Printf("Error subscribing to topic for SN: %s, %v", v.Sn, err) //// handleResult(v.Sn, false) //// return //// } //// //// pubTopic := fmt.Sprintf("/pub/%s", v.Sn) //// for i := 0; i < 2; i++ { //// time.Sleep(time.Second * time.Duration(i+1)) //// if err := MqttServer.PubMqttMessage(client, pubTopic, msgBytes); err != nil { //// log.Printf("设备参数设置失败: %s, %v", v.Sn, err) //// continue //// } //// break // 成功后跳出循环 //// } //// //// select { //// case vs := <-responseChan: //// fmt.Println("channel收到数据:", vs) //// handleResult(v.Sn, true) //// case <-time.After(3 * time.Second): //// log.Printf("连接设备超时: %s", v.Sn) //// handleResult(v.Sn, false) //// } //// }(v) //// } //// //// wg.Wait() //// //// if len(failSn) > 0 { //// c.Data["json"] = lib.JSONS{Code: 1201, Msg: "部分设置失败", Data: failSn} //// } else { //// c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil} //// } //// c.ServeJSON() ////} // //// 辅助函数用于处理成功的情况 //func handleSuccess(sn string, mutex *sync.Mutex, successfulSns map[string]bool) { // mutex.Lock() // defer mutex.Unlock() // successfulSns[sn] = true // 标记为成功 //} // //// 辅助函数用于处理失败的情况 //func handleFailure(sn string, failSn *[]string, mutex *sync.Mutex, successfulSns map[string]bool) { // mutex.Lock() // defer mutex.Unlock() // if !successfulSns[sn] { // *failSn = append(*failSn, sn) // } //} // //// CompanyClass 获取公司设备类目 //func (c *DeviceController) CompanyClass() { // is, admin := lib.VerificationController(&c.Controller) // if !is { // c.Data["json"] = lib.JSONS{202, "用户未登录", nil} // c.ServeJSON() // return // } // // //类名列表 // classList := Company.Read_CompanyClass_All(admin.T_pid, "") // c.Data["json"] = classList // c.ServeJSON() //} // //// DataRepeat 数据重传 //func (c *DeviceController) DataRepeat() { // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // t := MqttServer.DataRepeat_C{} // bytes, _ := io.ReadAll(c.Ctx.Request.Body) // json.Unmarshal(bytes, &t) // fmt.Println("浏览器接收数据:", t) // s, _ := time.Parse("2006-01-02 15:04:05", t.StartTime) // e, _ := time.Parse("2006-01-02 15:04:05", t.EndTime) // var wg sync.WaitGroup // successfulSns := make(map[string]bool) // 记录成功的SN // mutex := &sync.Mutex{} // 保护对successfulSns的访问 // responseChan := make(chan string, len(t.Sns)) // failSn := make([]string, 0) // done := make(chan struct{}) // defer close(responseChan) // defer close(done) // handleResult := func(sn string, success bool) { // mutex.Lock() // defer mutex.Unlock() // if success { // successfulSns[sn] = true // } else { // failSn = append(failSn, sn) // } // } // // 发送MQTT // for k, v := range t.Sns { // wg.Add(1) // go func(k string, v []int) { // defer wg.Done() // topic := fmt.Sprintf("/pub/%s", k) // repeatPub := MqttServer.DataRepeat_Pub{Sn: k, Type: 9, Mid: time.Now().Unix(), Data: MqttServer.DataRepeat_Pub_Data{Start: s.Unix(), End: e.Unix(), Id: v}} // msg, _ := json.Marshal(repeatPub) // mqttId := strings.Split(Device.ReadDeviceMqttId(k), "\"")[0] // // cli, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", k, err) // handleResult(k, false) // return // } // defer cli.Disconnect() // defer cli.Terminate() // topicSub := fmt.Sprintf("/sub/%s", k) // //err = MqttServer.Subscript(cli, subTopic, responseChan, done) // //if err != nil { // // log.Printf("Error subscribing to topic for SN: %s, %v", k, err) // // handleResult(k, false) // // return // //} // err = cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // log.Println("接收到数据", string(message)) // var devi Device.Deviation // err = json.Unmarshal(message, &devi) // if err != nil { // logs.Error("json序列化失败") // } else { // handleResult(k, true) // } // }, // }, // }, // }) // // if err := MqttServer.PubMqttMessage(cli, topic, msg); err != nil { // log.Printf("Error publishing MQTT message for SN: %s, %v", k, err) // //c.Data["json"] = lib.JSONS{Code: 0, Msg: "设置失败", Data: k} // return // } // time.Sleep(2 * time.Second) // }(k, v) // } // wg.Wait() // if len(failSn) > 0 { // c.Data["json"] = lib.JSONS{Code: 1201, Msg: "部分重传失败", Data: failSn} // } else { // c.Data["json"] = lib.JSONS{Code: 200, Msg: "重传数据成功!", Data: nil} // } // c.ServeJSON() //} // //// ReadDeviation 读取偏差值 //// //// func (c *DeviceController) ReadDeviation() { //// b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) //// if !b_ { //// c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} //// c.ServeJSON() //// return //// } //// //// t := make(map[string][]int) //// bytes, err := io.ReadAll(c.Ctx.Request.Body) //// if err != nil { //// log.Printf("Error reading request body: %v", err) //// c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} //// c.ServeJSON() //// return //// } //// if err := json.Unmarshal(bytes, &t); err != nil { //// log.Printf("Error unmarshalling JSON: %v", err) //// c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} //// c.ServeJSON() //// return //// } //// fmt.Println("浏览器接收数据:", t) //// //// // MQTT发送 //// fmt.Println("发送MQTT t:", t) //// //// var deviceRepeatData []string //// var errors []string //// for k, v := range t { //// topicSub := fmt.Sprintf("/sub/%s", k) //// topicPub := fmt.Sprintf("/pub/%s", k) //// mqttId := Device.ReadDeviceMqttId(k) //// //// cli, err := MqttServer.GetMqttClient(mqttId) //// if err != nil { //// log.Printf("Error getting MQTT client for SN: %s, %v", k, err) //// errors = append(errors, fmt.Sprintf("设备: %s 获取MQTT客户端失败 %v", k, err)) //// continue //// } //// defer cli.Disconnect() //// defer cli.Terminate() //// err = cli.Subscribe(&client.SubscribeOptions{ //// SubReqs: []*client.SubReq{ //// &client.SubReq{ //// TopicFilter: []byte(topicSub), //// QoS: mqtt.QoS1, //// Handler: func(topicName, message []byte) { //// log.Println("接收到数据", string(message)) //// var devi Device.Deviation //// err = json.Unmarshal(message, &devi) //// if err != nil { //// logs.Error("json序列化失败") //// } else { //// deviceRepeatData = append(deviceRepeatData, string(message)) //// } //// }, //// }, //// }, //// }) //// pubData, err := json.Marshal(MqttServer.Deviation_Pub{ //// Sn: k, //// Type: 7, //// Mid: time.Now().Unix(), //// Data: v, //// }) //// if err != nil { //// log.Printf("Error marshalling JSON for SN: %s, %v", k, err) //// errors = append(errors, fmt.Sprintf("设备: %s 序列化消息失败 %v", k, err)) //// continue //// } //// //// if err := MqttServer.PubMqttMessage(cli, topicPub, pubData); err != nil { //// log.Printf("Error publishing MQTT message for SN: %s, %v", k, err) //// errors = append(errors, fmt.Sprintf("设备: %s 发布消息失败 %v", k, err)) //// continue //// } //// } //// time.Sleep(2 * time.Second) //// if len(errors) > 0 { //// c.Data["json"] = lib.JSONS{Code: 500, Msg: "部分设备操作失败", Data: errors} //// } else { //// c.Data["json"] = lib.JSONS{Code: 200, Msg: "偏差值上传成功", Data: deviceRepeatData} //// } //// c.ServeJSON() //// } //func (c *DeviceController) ReadDeviation() { // // 用户令牌验证 // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // t := make(map[string][]int) // bytes, err := io.ReadAll(c.Ctx.Request.Body) // if err != nil { // log.Printf("Error reading request body: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} // c.ServeJSON() // return // } // if err := json.Unmarshal(bytes, &t); err != nil { // log.Printf("Error unmarshalling JSON: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} // c.ServeJSON() // return // } // fmt.Println("浏览器接收数据:", t) // // // 设置整体操作的超时时间,例如设置为5秒,可根据实际情况调整 // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // defer cancel() // // var deviceRepeatData []string // var errors []string // for k, v := range t { // topicSub := fmt.Sprintf("/sub/%s", k) // topicPub := fmt.Sprintf("/pub/%s", k) // mqttId := Device.ReadDeviceMqttId(k) // // // 获取MQTT客户端实例,设置超时控制 // _, cancelGetClient := context.WithTimeout(ctx, 2*time.Second) // cli, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", k, err) // errors = append(errors, fmt.Sprintf("设备: %s 获取MQTT客户端失败 %v", k, err)) // continue // } // defer cli.Disconnect() // defer cli.Terminate() // cancelGetClient() // // // 订阅主题操作,设置超时控制 // _, cancelSubscribe := context.WithTimeout(ctx, 3*time.Second) // err = cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // log.Println("接收到数据", string(message)) // var devi Device.Deviation // err = json.Unmarshal(message, &devi) // if err != nil { // logs.Error("json序列化失败") // } else { // if devi.Type != 7 { // fmt.Println("devi", devi) // errors = append(errors, fmt.Sprintf("设备: %s 订阅主题失败 %v", k, err)) // } else { // deviceRepeatData = append(deviceRepeatData, string(message)) // } // } // }, // }, // }, // }) // if err != nil { // log.Printf("Error subscribing MQTT topic for SN: %s, %v", k, err) // errors = append(errors, fmt.Sprintf("设备: %s 订阅主题失败 %v", k, err)) // continue // } // cancelSubscribe() // // // 发布消息操作,设置超时控制 // _, cancelPublish := context.WithTimeout(ctx, 3*time.Second) // pubData, err := json.Marshal(MqttServer.Deviation_Pub{ // Sn: k, // Type: 7, // Mid: time.Now().Unix(), // Data: v, // }) // if err != nil { // log.Printf("Error marshalling JSON for SN: %s, %v", k, err) // errors = append(errors, fmt.Sprintf("设备: %s 序列化消息失败 %v", k, err)) // continue // } // // if err := MqttServer.PubMqttMessage(cli, topicPub, pubData); err != nil { // log.Printf("Error publishing MQTT message for SN: %s, %v", k, err) // errors = append(errors, fmt.Sprintf("设备: %s 发布消息失败 %v", k, err)) // continue // } // cancelPublish() // } // time.Sleep(2 * time.Second) // fmt.Println("deviceRepeatData", deviceRepeatData) // // 根据是否有错误情况返回相应结果 // if len(errors) > 0 || len(deviceRepeatData) == 0 { // c.Data["json"] = lib.JSONS{Code: 500, Msg: "读取参数失败-无法通信", Data: errors} // c.ServeJSON() // return // } else { // c.Data["json"] = lib.JSONS{Code: 200, Msg: "读取参数成功", Data: deviceRepeatData} // c.ServeJSON() // } //} // //// WriteDeviation 设置偏差值 //func (c *DeviceController) WriteDeviation() { // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // bytes, _ := io.ReadAll(c.Ctx.Request.Body) // fmt.Println("请求json:", string(bytes)) // // data := make([]MqttServer.Deviation_Sub, 0) // if err := json.Unmarshal(bytes, &data); err != nil { // log.Printf("Error unmarshalling JSON: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "mqtt请求失败"} // c.ServeJSON() // return // } // var wg sync.WaitGroup // successfulSns := make(map[string]bool) // 记录成功的SN // mutex := &sync.Mutex{} // 保护对successfulSns的访问 // failSn := make([]string, 0) // handleResult := func(sn string, success bool) { // mutex.Lock() // defer mutex.Unlock() // if success { // successfulSns[sn] = true // } else { // failSn = append(failSn, sn) // } // } // // 处理每个设备的数据 // for _, v := range data { // wg.Add(1) // go func(v MqttServer.Deviation_Sub) { // defer wg.Done() // v.Type = 8 // v.Mid = int(time.Now().Unix()) // var devi Device.Deviation // mqttId := Device.ReadDeviceMqttId(v.Sn) // cli, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err) // handleResult(v.Sn, false) // return // } // defer cli.Disconnect() // defer cli.Terminate() // msgBytes, err := json.Marshal(v) // if err != nil { // log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err) // handleResult(v.Sn, false) // return // } // topicSub := fmt.Sprintf("/sub/%s", v.Sn) // err = cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // log.Println("接收到数据", string(message)) // // err = json.Unmarshal(message, &devi) // if err != nil { // logs.Error("json序列化失败") // } else { // handleResult(v.Sn, true) // } // }, // }, // }, // }) // if err := MqttServer.PubMqttMessage(cli, fmt.Sprintf("/pub/%s", v.Sn), msgBytes); err != nil { // log.Printf("设备参数设置失败: %s, %v", v.Sn, err) // failSn = append(failSn, v.Sn) // } // time.Sleep(3 * time.Second) // if len(devi.Data) == 0 { // failSn = append(failSn, v.Sn) // } // }(v) // } // wg.Wait() // if len(failSn) > 0 { // c.Data["json"] = lib.JSONS{Code: 1201, Msg: "部分设置失败", Data: failSn} // } else { // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil} // } // c.ServeJSON() //} // //// WriteDeviationAll 批量设置偏差值 //func (c *DeviceController) WriteDeviationAll() { // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // bytes, _ := io.ReadAll(c.Ctx.Request.Body) // fmt.Println("请求json:", string(bytes)) // // var data MqttServer.Deviation_SubAll // if err := json.Unmarshal(bytes, &data); err != nil { // log.Printf("Error unmarshalling JSON: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "mqtt请求失败"} // c.ServeJSON() // return // } // var wg sync.WaitGroup // successfulSns := make(map[string]bool) // 记录成功的SN // mutex := &sync.Mutex{} // 保护对successfulSns的访问 // failSn := make([]string, 0) // handleResult := func(sn string, success bool) { // mutex.Lock() // defer mutex.Unlock() // if success { // successfulSns[sn] = true // } else { // failSn = append(failSn, sn) // } // } // // 处理每个设备的数据 // for i, sn := range data.Sn { // subData := data.Data[i] // wg.Add(1) // go func(sn string, subData MqttServer.Deviation_Sub_Data) { // defer wg.Done() // sub_data := make([]MqttServer.Deviation_Sub_Data, 0) // sub_data = append(sub_data, subData) // var devi Device.Deviation // v := MqttServer.Deviation_Sub{ // Type: 8, // Sn: sn, // Mid: int(time.Now().Unix()), // Data: sub_data, // } // mqttId := Device.ReadDeviceMqttId(v.Sn) // cli, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err) // handleResult(v.Sn, false) // return // } // defer cli.Disconnect() // defer cli.Terminate() // // msgBytes, err := json.Marshal(v) // if err != nil { // log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err) // handleResult(v.Sn, false) // return // } // topicSub := fmt.Sprintf("/sub/%s", v.Sn) // err = cli.Subscribe(&client.SubscribeOptions{ // SubReqs: []*client.SubReq{ // &client.SubReq{ // TopicFilter: []byte(topicSub), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // log.Println("批量接收到值", string(message)) // err = json.Unmarshal(message, &devi) // if err != nil { // logs.Error("json序列化失败") // } else { // // handleResult(v.Sn, true) // } // }, // }, // }, // }) // if err := MqttServer.PubMqttMessage(cli, fmt.Sprintf("/pub/%s", v.Sn), msgBytes); err != nil { // log.Printf("设备参数设置失败: %s, %v", v.Sn, err) // failSn = append(failSn, v.Sn) // } // time.Sleep(3 * time.Second) // if len(devi.Data) == 0 { // failSn = append(failSn, v.Sn) // } // }(sn, subData) // } // wg.Wait() // if len(failSn) > 0 { // c.Data["json"] = lib.JSONS{Code: 1201, Msg: "部分设置失败", Data: failSn} // } else { // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil} // } // c.ServeJSON() //} // //// ReadSensor 读取偏差值 //func (c *DeviceController) ReadSensor() { // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // t := make(map[string][]int) // bytes, _ := io.ReadAll(c.Ctx.Request.Body) // if err := json.Unmarshal(bytes, &t); err != nil { // log.Printf("Error unmarshalling JSON: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} // c.ServeJSON() // return // } // fmt.Println("浏览器接收数据:", t) // // // MQTT发送 // fmt.Println("发送MQTT t:", t) // deviation := make(chan string, 10) // done := make(chan struct{}) // var count = 0 // // for k, v := range t { // topicSub := fmt.Sprintf("/sub/%s", k) // topicPub := fmt.Sprintf("/pub/%s", k) // mqttId := Device.ReadDeviceMqttId(k) // // client, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", k, err) // continue // } // // // 订阅主题 // if err := MqttServer.Subscript(client, topicSub, deviation, done); err != nil { // log.Printf("Error subscribing to MQTT topic for SN: %s, %v", k, err) // continue // } // // pubData, _ := json.Marshal(MqttServer.Deviation_Pub{ // Sn: k, // Type: 5, // Mid: time.Now().Unix(), // Data: v, // }) // // if err := MqttServer.PubMqttMessage(client, topicPub, pubData); err != nil { // log.Printf("Error publishing MQTT message for SN: %s, %v", k, err) // c.Data["json"] = lib.JSONS{Code: 1, Msg: "mqtt发送失败", Data: k} // continue // } // // count++ // } // // deviceRepeatData := make([]string, 0) // timeout := time.After(10 * time.Second) // 设置超时时间 // for count > 0 { // select { // case v := <-deviation: // fmt.Println("channel收到数据:", v) // fmt.Println("count:", count) // deviceRepeatData = append(deviceRepeatData, v) // count-- // case <-timeout: // log.Println("Timeout waiting for MQTT responses") // c.Data["json"] = lib.JSONS{Code: 500, Msg: "Timeout waiting for MQTT responses"} // c.ServeJSON() // return // } // } // // close(deviation) // close(done) // fmt.Println("响应数据:", deviceRepeatData) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "偏差值上传成功", Data: deviceRepeatData} // c.ServeJSON() // return //} // //// WriteSensor 设置偏差值 //func (c *DeviceController) WriteSensor() { // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // // bytes, err := io.ReadAll(c.Ctx.Request.Body) // if err != nil { // log.Printf("Error reading request body: %v", err) // c.Data["json"] = lib.JSONS{Code: 500, Msg: "Internal Server Error"} // c.ServeJSON() // return // } // fmt.Println("请求json:", string(bytes)) // // data := make([]MqttServer.Sensor_Sub, 0) // if err := json.Unmarshal(bytes, &data); err != nil { // log.Printf("Error unmarshalling JSON: %v", err) // c.Data["json"] = lib.JSONS{Code: 400, Msg: "Bad Request"} // c.ServeJSON() // return // } // // // 设置上下文超时 // ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // defer cancel() // // // 处理每个设备的数据 // for _, v := range data { // go func(v MqttServer.Sensor_Sub) { // select { // case <-ctx.Done(): // log.Printf("Context canceled or timeout for SN: %s", v.Sn) // return // default: // } // // v.Type = 6 // v.Mid = int(time.Now().Unix()) // mqttId := Device.ReadDeviceMqttId(v.Sn) // // client, err := MqttServer.GetMqttClient(mqttId) // if err != nil { // log.Printf("Error getting MQTT client for SN: %s, %v", v.Sn, err) // return // } // // msgBytes, err := json.Marshal(v) // if err != nil { // log.Printf("Error marshalling JSON for SN: %s, %v", v.Sn, err) // client.Disconnect() // client.Terminate() // return // } // // for i := 0; i < 3; i++ { // time.Sleep(time.Second * time.Duration(i+1)) // select { // case <-ctx.Done(): // log.Printf("Context canceled or timeout for SN: %s", v.Sn) // client.Disconnect() // client.Terminate() // return // default: // } // // if err := MqttServer.PubMqttMessage(client, fmt.Sprintf("/pub/%s", v.Sn), msgBytes); err != nil { // log.Printf("Error publishing MQTT message for SN: %s, %v", v.Sn, err) // sprintf := fmt.Sprintf("设备参数设置失败: %s, %v", v.Sn, err) // c.Data["json"] = lib.JSONS{Code: 0, Msg: sprintf, Data: v.Sn} // return // } // } // // client.Disconnect() // client.Terminate() // }(v) // } // // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置偏差值成功!", Data: nil} // c.ServeJSON() // return //} // //// 列表 - //func (c *DeviceController) DeviceWarning_List_html() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // c.ServeJSON() // return // } // c.Data["Admin_r"] = admin_r // page, _ := c.GetInt("page") // println(page) // if page < 1 { // page = 1 // } // c.Data["Admin_r"] = admin_r // // bindSN := c.GetStrings("bindSN") // tpList := c.GetStrings("tpList") // // T_sn := c.GetString("T_sn") // if len(T_sn) != 0 { // bindSN = append(bindSN, T_sn) // c.Data["T_sn"] = T_sn // } // Time_start := c.GetString("Time_start") // Time_end := c.GetString("Time_end") // // if len(Time_start) == 0 && len(Time_end) == 0 { // Time_start = time.Now().Format("2006-01-02") + " 00:00:00" // Time_end = time.Now().Format("2006-01-02") + " 23:59:59" // } // c.Data["Time_start"] = Time_start // c.Data["Time_end"] = Time_end // // //c.Data["Class_List"] = Device.Read_DeviceWarningList_All_1() // //T_Title := "" // //if Class_1 > 0 { // // T_Title = Device.Read_DeviceWarningList_ById(Class_1).T_name // //} // pageSize := c.GetString("pageSize", "100") // pageSizeInt, _ := strconv.Atoi(pageSize) // //atoi, _ := strconv.Atoi(pageSizes.PageSize) // getString := c.GetString("t_tp") // t_tp, _ := strconv.Atoi(getString) // var cnt int64 // DeviceWarning_List, cnt := Warning.Read_Warning_List(admin_r.T_pid, bindSN, tpList, Time_start, Time_end, t_tp, page, pageSizeInt) // c.Data["List"] = DeviceWarning_List // page_size := math.Ceil(float64(cnt) / float64(pageSizeInt)) // c.Data["Page"] = page // c.Data["Page_size"] = page_size // c.Data["Pages"] = lib.Func_page(int64(page), int64(page_size)) // c.Data["cnt"] = cnt // // 将sync.Map中的数据转存到切片中 // // c.TplName = "Device/DeviceWarning.html" //} // //// GetWarningtype 获取报警类型 //func (c *DeviceController) GetWarningtype() { // var results []struct { // Key int // Value string // } // Warning.WarningType_list.Range(func(key, value any) bool { // // 确保类型断言成功 // if k, ok := key.(int); ok { // if v, ok := value.(string); ok { // // 创建匿名结构体实例并添加到切片 // results = append(results, struct { // Key int // Value string // }{Key: k, Value: v}) // } else { // fmt.Println("Value is not of type string") // } // } else { // fmt.Println("Key is not of type int") // } // return true // 继续遍历 // }) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!", Data: results} // c.ServeJSON() // log.Println(results) //} //func (c *DeviceController) DeviceWarning_() { // // 验证登录 // //b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // //if !b_ { // // c.Data["json"] = lib.JSONS{Code: 201, Msg: "User_tokey Err!"} // // c.ServeJSON() // // return // //} // // c.Data["WarningType"] = Warning.Read_WarningType_All() // c.TplName = "Device/DeviceWarning-.html" //} // //// DeviceWarning_Post 添加报警 //func (c *DeviceController) DeviceWarning_Post() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // // T_tp, _ := c.GetInt("T_tp") // T_sn := c.GetString("T_sn") // T_id, _ := c.GetInt("T_id") // T_Ut := c.GetString("T_Ut") // T_Remark := c.GetString("T_Remark") // r_Device, err := Device.Read_Device_ByT_sn(T_sn) // if err != nil { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "E!", Data: "SN 错误!!!"} // c.ServeJSON() // return // } // // 获取 传感器 参数 // DeviceSensor_r, is := Device.Read_DeviceSensor_ByT_sn(r_Device.T_sn, T_id) // if !is { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "E!", Data: "编号 错误!!!"} // c.ServeJSON() // return // } // // t1, _ := time.ParseInLocation("2006-01-02 15:04:05", T_Ut, time.Local) // +8 时差 // t_c := Warning.Warning{ // T_pid: admin_r.T_pid, // T_tp: T_tp, // T_sn: T_sn, // T_D_name: r_Device.T_devName, // T_id: T_id, // T_DS_name: DeviceSensor_r.T_name, // T_Ut: t1, // T_State: 1, // T_Remark: T_Remark, // } // Warning.Add_Warning(t_c) // // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!"} // c.ServeJSON() // return //} // //// DeviceWarning_Del 删除报警 //func (c *DeviceController) DeviceWarning_Del() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // // Id, _ := c.GetInt("Id") // // if Id > 1000 { // Warning.Delete_Warning(admin_r.T_pid, Id) // } // // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!"} // c.ServeJSON() // return //} // //// DeviceWarning_DelS 批量删除 //func (c *DeviceController) DeviceWarning_DelS() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // type SelectedItem struct { // Id string `json:"id"` // Ut string `json:"ut"` // 假设"ut"字段是时间戳字符串 // } // // type SelectedItems struct { // SelectedIds []SelectedItem `json:"selectedIds"` // } // var SelectedIds SelectedItems // //var selectedIds map[string]any // // err := json.Unmarshal(c.Ctx.Input.RequestBody, &SelectedIds) // log.Println(SelectedIds.SelectedIds) // for _, v := range SelectedIds.SelectedIds { // Warning.Delete_Warning_List(v.Id, v.Ut, admin_r.T_pid) // } // if err != nil { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "E!", Data: "数据格式错误!!!"} // c.ServeJSON() // } else { // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!"} // c.ServeJSON() // } //} // //// Device_Copy 复制并添加 //func (c *DeviceController) Device_Copy() { // // 验证登录 // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // type T_data struct { // T_sn string `json:"t_sn"` // T_id int `json:"t_id"` // T_Rh any `json:"t_rh"` // T_Site string `json:"t_site"` // T_T any `json:"t_t"` // CreateTime string `json:"createTime"` // } // var data T_data // json.Unmarshal(c.Ctx.Input.RequestBody, &data) // r := Device.Read_DeviceParameter_SNNo(data.T_sn) // parse, _ := time.Parse("2006-01-02 15:04:05", data.CreateTime) // data.CreateTime = parse.Add(time.Duration(r[0].T_saveT) * time.Second).Format("2006-01-02 15:04:05") // itoa := strconv.Itoa(data.T_id) // Device.Copy_DeviceData(data.T_sn, itoa, data.T_Rh, data.T_T, data.T_Site, data.CreateTime) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置成功", Data: data} // c.ServeJSON() //} // //// DeviceWarningUpdate 修改报警 //func (c *DeviceController) DeviceWarningUpdate() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // var data struct { // ColumnName string `json:"columnName"` // NewValue string `json:"newValue"` // RowId string `json:"rowId"` // T_Ut string `json:"T_Ut"` // SN string `json:"sn"` // } // json.Unmarshal(c.Ctx.Input.RequestBody, &data) // log.Println(admin_r.T_pid) // Warning.Update_DeviceParameter_Warning(data.ColumnName, data.NewValue, data.RowId, data.T_Ut) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置成功", Data: data} // c.ServeJSON() //} // //// GetDeviceALLSN 根据pid获取所有sn //func (c *DeviceController) GetDeviceALLSN() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // r := Device.Read_Device_All_SN(admin_r.T_pid) // log.Println(r) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!", Data: r} // c.ServeJSON() //} // //// GetDeviceALLTID 根据SN获取所有探头 //func (c *DeviceController) GetDeviceALLTID() { // // 验证登录 // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // getString := c.GetString("sn") // r := Device.Get_DeviceSensor_Tid_ByT_sn(getString) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "设置成功", Data: r} // c.ServeJSON() //} // //// DeviceWarningAdd 复制添加报警 //func (c *DeviceController) DeviceWarningAdd() { // // 验证登录 // b_, admin_r := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // var data struct { // T_tp_name string `json:"column1"` // T__d_name string `json:"column2"` // T_DS_name string `json:"column3"` // T_Remark string `json:"column4"` // T_State string `json:"column5"` // T_Ut string `json:"column6"` // RowId string `json:"rowId"` // SN string `json:"sn"` // T_id string `json:"t_id"` // } // // json.Unmarshal(c.Ctx.Input.RequestBody, &data) // atoi, _ := strconv.Atoi(data.T_id) // r_Device, err := Device.Read_Device_ByT_sn(data.SN) // if err != nil { // c.Data["json"] = lib.JSONS{Code: 201, Msg: "E!", Data: "SN 错误!!!"} // c.ServeJSON() // return // } // T_tp := Warning.Read_WarningType(data.T_tp_name) // t1, _ := time.ParseInLocation("2006-01-02 15:04:05", data.T_Ut, time.Local) // +8 时差 // t_c := Warning.Warning{ // T_pid: admin_r.T_pid, // T_tp: T_tp, // T_sn: data.SN, // T_D_name: r_Device.T_devName, // T_id: atoi, // T_DS_name: data.T_DS_name, // T_Ut: t1, // T_State: 1, // T_Remark: data.T_Remark, // } // Warning.Add_Warning(t_c) // c.Data["json"] = lib.JSONS{Code: 200, Msg: "ok!"} // c.ServeJSON() // return //} // //// SiftWarningType 根据报警类型筛选 //func (c *DeviceController) SiftWarningType() { // // 验证登录 // b_, _ := lib.Verification(c.Ctx.GetCookie("User_tokey"), c.GetString("User_tokey")) // if !b_ { // c.Ctx.Redirect(302, "Login") // return // } // //}