package controllers import ( "AIOTCOER/Handle" "AIOTCOER/lib" "AIOTCOER/logs" "AIOTCOER/models/Device" "AIOTCOER/models/Product" "encoding/json" "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/beego/beego/v2/server/web/context" "github.com/gorilla/websocket" "net/http" "strings" "time" ) type DeviceController struct { beego.Controller } func (c *DeviceController) WS() { ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil) if err != nil { logs.Println("MySocketController:", err) } // 退出 defer func() { lib.ClearSubscription(ws) // 清楚订阅 ws.CloseHandler() }() // 这里是我用户的结构体,你可以按自己需要定义 logs.Println("ws上线:", fmt.Sprintf("%p", ws)) for { var msg RecvWs err := ws.ReadJSON(&msg) if err != nil { logs.Println("ReadJSON err:", err) if strings.Contains(err.Error(), "websocket: close 1005") { ws.Close() return } lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "数据异常!"}) continue } Device_r_ := Device.Device{T_sn: msg.Sn} if !Device_r_.Read_Tidy() { lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "验证失败!"}) continue } if len(Device_r_.T_ckey) != 0 { if Device_r_.T_ckey != msg.Password { lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "验证失败 !"}) continue } } // 1 订阅SN数据 2 解绑SN数据 3 项设备发送数据 switch msg.Type { case 0: // 0 心跳 lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Success, Msg: "ok!"}) break case 1: // 1 订阅SN数据 SubConn := []*websocket.Conn{} // 从map中读取数据 val, ok := lib.WsSubClinets.Load(msg.Sn) if ok { SubConn = val.([]*websocket.Conn) } SubConn_is := true // 重复订阅 标签 for _, v := range SubConn { if v == ws { fmt.Println("重复订阅:", fmt.Sprintf("%p", v), msg.Sn) SubConn_is = false lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "重复订阅!"}) break } } if SubConn_is { // 防止重复订阅 SubConn = append(SubConn, ws) // 写入map lib.WsSubClinets.Store(msg.Sn, SubConn) } lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Success, Msg: "ok!"}) case 2: // 2 解绑SN数据 // 从map中读取数据 val, ok := lib.WsSubClinets.Load(msg.Sn) if ok { SubConn := val.([]*websocket.Conn) for i, v := range SubConn { if v == ws { fmt.Println("删除订阅:", fmt.Sprintf("%p", v), msg.Sn) SubConn = append(SubConn[:i], SubConn[i+1:]...) lib.WsSubClinets.Store(msg.Sn, SubConn) break } } } lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Success, Msg: "ok!"}) case 3: // 3 项设备发送数据 json_str, _ := json.Marshal(msg.Json) err = Handle.PushHandle(&Device_r_, Device_r_.T_sn, string(json_str)) if err != nil { lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "e!", Data: err.Error()}) break } default: lib.WebsocketWriteJSON(ws, lib.JSONR{Code: lib.Error, Msg: "Type 错误!"}) break } } } func (c *DeviceController) SSE() { T_sn := c.GetString("T_sn") T_ckey := c.GetString("T_ckey") if len(T_sn) == 0 { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_sn!"} c.ServeJSON() return } Device_r := Device.Device{T_sn: T_sn} if !Device_r.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "验证失败!"} c.ServeJSON() return } if len(Device_r.T_ckey) != 0 { if Device_r.T_ckey != T_ckey { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "验证失败 !"} c.ServeJSON() return } } c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream") c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache") c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive") // SubConn := []*context.Response{} // 从map中读取数据 val, ok := lib.SseSubClinets.Load(Device_r.T_sn) if ok { SubConn = val.([]*context.Response) } SubConn = append(SubConn, c.Ctx.ResponseWriter) // 写入map lib.SseSubClinets.Store(Device_r.T_sn, SubConn) // 退出 删除订阅 defer func() { // 从map中读取数据 val, ok = lib.SseSubClinets.Load(Device_r.T_sn) if ok { SubConn = val.([]*context.Response) for i, v := range SubConn { if v == c.Ctx.ResponseWriter { fmt.Println("删除订阅:", fmt.Sprintf("%p", v), Device_r.T_sn) SubConn = append(SubConn[:i], SubConn[i+1:]...) lib.SseSubClinets.Store(Device_r.T_sn, SubConn) break } } } }() //data, _ := json.Marshal(Device_r.T_dataJson) //fmt.Println(string(data)) lib.SseWriteJSON(c.Ctx.ResponseWriter,[]byte(Device_r.T_data)) for true { // Sleep for 1 second before sending the next update time.Sleep(1 * time.Second) } // Close the connection c.Ctx.ResponseWriter.WriteHeader(http.StatusOK) } func (c *DeviceController) List() { PageIndex, _ := c.GetInt("PageIndex", 0) PageSize, _ := c.GetInt("PageSize", 10) Devicer := Device.Device{} c.ParseForm(&Devicer) ProductTyper := Product.ProductType{T_ProductID: Devicer.T_ProductID} if !ProductTyper.Read() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "ProductID E!"} c.ServeJSON() return } T_uuid := c.GetString("T_uuid") if T_uuid != ProductTyper.T_uuid { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_uuid!"} c.ServeJSON() return } Device_r, Total := Devicer.Lists(PageIndex, PageSize) c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!", Data: lib.C_Page(Device_r, PageIndex, PageSize, Total)} c.ServeJSON() return } func (c *DeviceController) Add() { // 验证秘钥 ProductType_r := Product.ProductType{} c.ParseForm(&ProductType_r) ProductKey := ProductType_r.T_ProductKey fmt.Println(ProductType_r) fmt.Println("T_ProductID:",c.Ctx.Input.Query("T_ProductID")) if len(ProductType_r.T_ProductID) != 8 { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ProductID!"} c.ServeJSON() return } if !ProductType_r.Read() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ProductID !"} c.ServeJSON() return } if ProductType_r.T_ProductKey != ProductKey { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ProductKey!"} c.ServeJSON() return } // 创建sn T_sn := c.GetString("T_sn") Device_r := Device.Device{T_ProductID: ProductType_r.T_ProductID, T_sn: T_sn} // CreateNum, _ := c.GetInt("T_CreateNum", 1) if CreateNum == 1 || len(Device_r.T_sn) >= 2 { // 单个 if !Device_r.CreateSn(1) { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_sn 重复!"} c.ServeJSON() return } c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!", Data: map[string]string{"T_sn": Device_r.T_sn, "T_password": Device_r.T_password}} c.ServeJSON() return } else { // 多个 var Device_List []map[string]string for i := 1; i <= CreateNum; i++ { Device_rr := Device_r Device_rr.CreateSn(i) Device_List = append(Device_List, map[string]string{"T_sn": Device_rr.T_sn, "T_password": Device_rr.T_password}) } c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!", Data: Device_List} c.ServeJSON() return } } func (c *DeviceController) Update() { Devicer := Device.Device{T_sn: c.GetString("T_sn")} if !Devicer.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} c.ServeJSON() return } if len(Devicer.T_ckey) != 0 { if Devicer.T_ckey != c.GetString("T_ckey") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ckey E!"} c.ServeJSON() return } } if len(Devicer.T_ckey) != 0 { Devicer.T_RelayData = c.GetString("T_RelayData") if !Devicer.Update("T_RelayData") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "E!"} c.ServeJSON() return } } println("T_ckey:",Devicer.T_ckey) if len(Devicer.T_ckey) == 0 && len(c.GetString("T_ckey")) != 0{ Devicer.T_ckey = c.GetString("T_ckey") Devicer.Update("T_ckey") } c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!"} c.ServeJSON() return } //func (c *DeviceController) Activation() { // // if len(c.GetString("T_sn")) == 0 { // c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} // c.ServeJSON() // return // } // // Devicer := Device.Device{T_sn: c.GetString("T_sn")} // // if !Devicer.Read_Tidy() { // c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} // c.ServeJSON() // return // } // // fmt.Println("T_ckey:",len(Devicer.T_ckey)) // if len(Devicer.T_ckey) != 0 { // c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "设备已激活!"} // c.ServeJSON() // return // } // // fmt.Println(Devicer) // Devicer.T_ckey = lib.GetRandstring(32, "", 1) // // if !Devicer.Update("T_ckey") { // c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "E!"} // c.ServeJSON() // return // } // // c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!",Data: Devicer.T_ckey} // c.ServeJSON() // return //} func (c *DeviceController) Get() { Devicer := Device.Device{} c.ParseForm(&Devicer) if !Devicer.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} c.ServeJSON() return } if len(Devicer.T_ckey) != 0 { if Devicer.T_ckey != c.GetString("T_ckey") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ckey E!"} c.ServeJSON() return } } c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!", Data: Devicer} c.ServeJSON() return } func (c *DeviceController) Push() { Devicer := Device.Device{} c.ParseForm(&Devicer) T_data := c.GetString("T_data") if len(T_data) == 0 { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_data E!"} c.ServeJSON() return } if !Devicer.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} c.ServeJSON() return } if len(Devicer.T_ckey) != 0 { if Devicer.T_ckey != c.GetString("T_ckey") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ckey E!"} c.ServeJSON() return } } err := Handle.PushHandle(&Devicer, Devicer.T_sn, T_data) if err != nil { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "e!", Data: err.Error()} c.ServeJSON() return } c.Data["json"] = lib.JSONR{Code: lib.Success, Msg: "ok!"} c.ServeJSON() return } func (c *DeviceController) DataList() { Devicer := Device.Device{T_sn: c.GetString("T_sn")} if !Devicer.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "SN E!"} c.ServeJSON() return } if len(Devicer.T_ckey) != 0 { if Devicer.T_ckey != c.GetString("T_ckey") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_ckey E!"} c.ServeJSON() return } } DeviceData_Form := Device.DeviceData_Form{} c.ParseForm(&DeviceData_Form) if len(DeviceData_Form.T_jointTab) < 1 { c.Data["json"] = lib.JSONS{Code: lib.Error, Msg: "T_jointTab !"} c.ServeJSON() return } JSONR_r := Device.Data_List(DeviceData_Form.T_sn+"_"+DeviceData_Form.T_jointTab, DeviceData_Form.T_jsonFind, DeviceData_Form.T_jsonSort, DeviceData_Form.PageIndex, DeviceData_Form.PageSize) c.Data["json"] = JSONR_r c.ServeJSON() return } func (c *DeviceController) SSELog() { T_sn := c.GetString("T_sn") if len(T_sn) == 0 { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "T_sn!"} c.ServeJSON() return } Device_r := Device.Device{T_sn: T_sn} if !Device_r.Read_Tidy() { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "验证失败!"} c.ServeJSON() return } if len(Device_r.T_ckey) != 0 { if Device_r.T_ckey != c.GetString("T_ckey") { c.Data["json"] = lib.JSONR{Code: lib.Error, Msg: "验证失败 !"} c.ServeJSON() return } } c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream") c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache") c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive") // SubConn := []*context.Response{} // 从map中读取数据 val, ok := lib.SseLogSubClinets.Load(Device_r.T_sn) if ok { SubConn = val.([]*context.Response) } SubConn = append(SubConn, c.Ctx.ResponseWriter) // 写入map lib.SseLogSubClinets.Store(Device_r.T_sn, SubConn) // 退出 删除订阅 defer func() { // 从map中读取数据 val, ok = lib.SseLogSubClinets.Load(Device_r.T_sn) if ok { SubConn = val.([]*context.Response) for i, v := range SubConn { if v == c.Ctx.ResponseWriter { fmt.Println("删除订阅:", fmt.Sprintf("%p", v), Device_r.T_sn) SubConn = append(SubConn[:i], SubConn[i+1:]...) lib.SseLogSubClinets.Store(Device_r.T_sn, SubConn) break } } } }() //data, _ := json.Marshal(Device_r.T_dataJson) //fmt.Println(string(data)) lib.SseWriteJSON(c.Ctx.ResponseWriter,[]byte("OK")) for true { // Sleep for 1 second before sending the next update time.Sleep(1 * time.Second) } // Close the connection c.Ctx.ResponseWriter.WriteHeader(http.StatusOK) }