123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package main
- import (
- "context"
- pb "emqx.io/grpc/exhook/protobuf"
- "encoding/json"
- "fmt"
- "github.com/patrickmn/go-cache"
- "github.com/pkg/errors"
- "google.golang.org/grpc"
- "log"
- "net"
- "strings"
- "time"
- )
- const (
- port = ":9000"
- )
- var cacheInstance = cache.New(5*time.Minute, 10*time.Minute)
- type server struct {
- pb.UnimplementedHookProviderServer
- }
- func (s *server) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
- hooks := []*pb.HookSpec{
- {Name: "message.publish"},
- }
- return &pb.LoadedResponse{Hooks: hooks}, nil
- }
- func (s *server) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
- log.Printf("[DEBUG] OnMessagePublish: %s", in.Message.Topic)
- topic := strings.TrimSuffix(in.GetMessage().GetTopic(), "/")
- payload := in.GetMessage().GetPayload()
- if payload == nil || len(payload) >= 1000 {
- return nil, errors.New("消息体为空或大于等于1000字节")
- }
- var jsonPayload map[string]interface{}
- if err := json.Unmarshal(payload, &jsonPayload); err != nil {
- return nil, errors.Wrap(err, "json解析失败")
- }
- typeVal, ok := jsonPayload["type"].(float64)
- if !ok {
- return nil, errors.New("json中'type'字段解析失败")
- }
- if int(typeVal) == 2 {
- key := fmt.Sprintf("%s-%v", topic, jsonPayload["data"])
- log.Printf("缓存中键的值: %s", key)
- if _, found := cacheInstance.Get(key); found {
- return discardMessagePublish(ctx, in, func(response *pb.ValuedResponse) error {
- log.Printf("丢弃重复消息")
- return nil
- })
- }
- cacheInstance.Set(key, "alarm", cache.DefaultExpiration)
- }
- // 正常发送消息
- return &pb.ValuedResponse{
- Type: pb.ValuedResponse_STOP_AND_RETURN,
- Value: &pb.ValuedResponse_Message{
- Message: in.GetMessage(),
- },
- }, nil
- }
- func discardMessagePublish(ctx context.Context, in *pb.MessagePublishRequest, responseWriter func(*pb.ValuedResponse) error) (*pb.ValuedResponse, error) {
- emptyPayload := []byte{}
- newMsg := &pb.Message{
- Id: in.Message.Id,
- Node: in.Message.Node,
- From: in.Message.From,
- Topic: in.Message.Topic,
- Payload: emptyPayload,
- Headers: map[string]string{"allow_publish": "false"},
- }
- reply := &pb.ValuedResponse{
- Type: pb.ValuedResponse_STOP_AND_RETURN,
- Value: &pb.ValuedResponse_Message{
- Message: newMsg,
- },
- }
- if err := responseWriter(reply); err != nil {
- return nil, errors.Wrap(err, "发送响应时出错")
- }
- return reply, nil
- }
- func main() {
- lis, err := net.Listen("tcp", port)
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- s := grpc.NewServer()
- pb.RegisterHookProviderServer(s, &server{})
- log.Println("Started gRPC server on", port)
- if err := s.Serve(lis); err != nil {
- log.Fatalf("failed to serve: %v", err)
- }
- }
|