12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package Nats
- import (
- "Cold_Data/StatisticsTask"
- "Cold_Data/conf"
- "Cold_Data/lib"
- "Cold_Data/logs"
- "Cold_Data/models/Account"
- "Cold_Data/models/Device"
- "encoding/json"
- "github.com/nats-io/nats.go"
- "strings"
- "time"
- )
- // 实体类
- type Ms2m_Project struct {
- //Sn string `json:"sn"`
- Type int `json:"type"`
- Msid int64 `json:"mid"`
- Dut int64 `json:"dut"`
- }
- func NatsInit() {
- time.Sleep(time.Minute * 1)
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
- if err != nil {
- logs.Println("nats 连接失败!")
- panic(any(err))
- }
- logs.Println("nats OK!")
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Mqtt_DeviceReal", func(m *nats.Msg) {
- //logs.Println("Nats Mqtt_DeviceReal: %s\n", string(m.Data))
- DataList := strings.Split(string(m.Data), "|+|")
- if len(DataList) != 2 {
- return
- }
- if len(DataList[0]) == 0 {
- return
- }
- var Ms2_project Ms2m_Project
- err := json.Unmarshal([]byte(DataList[1]), &Ms2_project)
- if err != nil {
- logs.Println("MqttServer", "JSON反序列化失败[Ms2m_Project]", string(m.Data), err.Error())
- return
- }
- // 只处理报警任务
- if Ms2_project.Type != 2 {
- return
- }
- r_Device, err := Device.Read_Device_ByT_sn(DataList[0])
- if err != nil {
- //logs.Println("没找到SN!", DataList[0])
- return
- }
- // 内部测试
- if r_Device.T_pid <= 3 {
- return
- }
- err, Company_r := Account.Read_Company_id(r_Device.T_pid)
- if err != nil {
- //logs.Println("公司ID 不存在!", r_Device.T_pid)
- return
- }
- StatisticsTask.Company_Handle(Company_r)
- })
- }
|