AlexJeo пре 7 месеци
комит
dd5db7f3c8

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+target/
+.idea

+ 15 - 0
README.md

@@ -0,0 +1,15 @@
+# exhook-svr-java
+
+This is a demo server written in java for exhook
+
+## Prerequisites
+
+- JDK version 8 or higher
+- Maven
+
+## Run
+
+```
+mvn package
+java -jar target/exhook-svr-1.0-jar-with-dependencies.jar
+```

+ 2 - 0
exhook-svr-java.iml

@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4" />

+ 138 - 0
pom.xml

@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>io.emqx</groupId>
+    <artifactId>exhook-svr</artifactId>
+    <version>1.0</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.79</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.8.27</version>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk18on</artifactId>
+            <version>1.78</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>5.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>bson</artifactId>
+            <version>5.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-core</artifactId>
+            <version>5.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty-shaded</artifactId>
+            <version>1.36.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>1.46.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>1.42.1</version>
+        </dependency>
+        <dependency> <!-- necessary for Java 9+ -->
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>annotations-api</artifactId>
+            <version>6.0.53</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.28</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.6.2</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.6.1</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:3.18.1:exe:${os.detected.classifier}</protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.42.1:exe:${os.detected.classifier}</pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>io.emqx.exhook.ExServer</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>assemble-all</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 449 - 0
src/main/java/io/emqx/exhook/ExServer.java

@@ -0,0 +1,449 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.emqx.exhook;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.emqx.exhook.MongoDB.MongoUtils;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Filters;
+import org.bson.Document;
+
+public class ExServer {
+    private static final Logger logger = Logger.getLogger(ExServer.class.getName());
+    private static SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS" );
+    private Server server;
+
+    private void start() throws IOException {
+        /* The port on which the server should run */
+        int port = 9000;
+
+        server = ServerBuilder.forPort(port)
+                .addService(new HookProviderImpl())
+                .build()
+                .start();
+        logger.info("Server started, listening on " + port);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+                System.err.println("*** shutting down gRPC server since JVM is shutting down");
+                try {
+                    ExServer.this.stop();
+                } catch (InterruptedException e) {
+                    e.printStackTrace(System.err);
+                }
+                System.err.println("*** server shut down");
+            }
+        });
+    }
+
+    private void stop() throws InterruptedException {
+        if (server != null) {
+            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Await termination on the main thread since the grpc library uses daemon threads.
+     */
+    private void blockUntilShutdown() throws InterruptedException {
+        if (server != null) {
+            server.awaitTermination();
+        }
+    }
+
+    /**
+     * Main launches the server from the command line.
+    */
+    public static void main(String[] args) throws IOException, InterruptedException {
+        final ExServer server = new ExServer();
+        server.start();
+        server.blockUntilShutdown();
+    }
+
+    static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase {
+
+        public void DEBUG(String fn, Object req) {
+            System.out.printf(fn + ", request: " + req);
+        }
+
+        @Override
+        public void onProviderLoaded(ProviderLoadedRequest request, StreamObserver<LoadedResponse> responseObserver) {
+            DEBUG("onProviderLoaded", request);
+            HookSpec[] specs = {
+                    HookSpec.newBuilder().setName("client.connect").build(),
+                    HookSpec.newBuilder().setName("client.connack").build(),
+                    HookSpec.newBuilder().setName("client.connected").build(),
+                    HookSpec.newBuilder().setName("client.disconnected").build(),
+                    HookSpec.newBuilder().setName("client.authenticate").build(),
+                    HookSpec.newBuilder().setName("client.authorize").build(),
+                    HookSpec.newBuilder().setName("client.subscribe").build(),
+                    HookSpec.newBuilder().setName("client.unsubscribe").build(),
+
+                    HookSpec.newBuilder().setName("session.created").build(),
+                    HookSpec.newBuilder().setName("session.subscribed").build(),
+                    HookSpec.newBuilder().setName("session.unsubscribed").build(),
+                    HookSpec.newBuilder().setName("session.resumed").build(),
+                    HookSpec.newBuilder().setName("session.discarded").build(),
+                    HookSpec.newBuilder().setName("session.takenover").build(),
+                    HookSpec.newBuilder().setName("session.terminated").build(),
+
+                    HookSpec.newBuilder().setName("message.publish").build(),
+                    HookSpec.newBuilder().setName("message.delivered").build(),
+                    HookSpec.newBuilder().setName("message.acked").build(),
+                    HookSpec.newBuilder().setName("message.dropped").build()
+            };
+            LoadedResponse reply = LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onProviderUnloaded(ProviderUnloadedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onProviderUnloaded", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientConnect(ClientConnectRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientConnect", request);
+            Thread MessageLogs = new Thread() {
+                public void run() {
+                    try {
+                        JSONObject parseObject = new JSONObject();
+                        parseObject.put("node",request.getConninfo().getNode());
+                        parseObject.put("clientid",request.getConninfo().getClientid());
+                        parseObject.put("peerhost",request.getConninfo().getClientid());
+                        parseObject.put("sockport",request.getConninfo().getSockport());
+                        parseObject.put("proto_name",request.getConninfo().getProtoName());
+                        parseObject.put("proto_ver",request.getConninfo().getProtoVer());
+                        parseObject.put("keepalive",request.getConninfo().getKeepalive());
+                        parseObject.put("timestamp",simpleDateFormat.format(new Date()));
+                        MongoUtils.save(parseObject,"MQTT_ClientConnect");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                }
+            };
+            MessageLogs.start();
+
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientConnack(ClientConnackRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientConnack", request);
+            Thread MessageLogs = new Thread() {
+                public void run() {
+                    try {
+                        JSONObject parseObject = new JSONObject();
+                        parseObject.put("node",request.getConninfo().getNode());
+                        parseObject.put("clientid",request.getConninfo().getClientid());
+                        parseObject.put("peerhost",request.getConninfo().getClientid());
+                        parseObject.put("sockport",request.getConninfo().getSockport());
+                        parseObject.put("proto_name",request.getConninfo().getProtoName());
+                        parseObject.put("proto_ver",request.getConninfo().getProtoVer());
+                        parseObject.put("username",request.getConninfo().getUsername());
+                        parseObject.put("keepalive",request.getConninfo().getKeepalive());
+                        parseObject.put("timestamp",simpleDateFormat.format(new Date()));
+                        MongoUtils.save(parseObject,"MQTT_ClientConnack");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            MessageLogs.start();
+
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientConnected(ClientConnectedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientConnected", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientDisconnected(ClientDisconnectedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientDisconnected", request);
+            Thread MessageLogs = new Thread() {
+                public void run() {
+                    try {
+                        JSONObject parseObject = new JSONObject();
+                        parseObject.put("node",request.getClientinfo().getNode());
+                        parseObject.put("clientid",request.getClientinfo().getClientid());
+                        parseObject.put("peerhost",request.getClientinfo().getClientid());
+                        parseObject.put("sockport",request.getClientinfo().getSockport());
+                        parseObject.put("anonymous",request.getClientinfo().getAnonymous());
+                        parseObject.put("protocol",request.getClientinfo().getProtocol());
+                        parseObject.put("username",request.getClientinfo().getUsername());
+                        parseObject.put("reason",request.getReason());
+                        parseObject.put("timestamp",simpleDateFormat.format(new Date()));
+                        MongoUtils.save(parseObject,"MQTT_ClientDisconnected");
+
+                        //更新神的连接状态
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            MessageLogs.start();
+
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientAuthenticate(ClientAuthenticateRequest request, StreamObserver<ValuedResponse> responseObserver) {
+            DEBUG("onClientAuthenticate", request);
+            Thread MessageLogs = new Thread() {
+                public void run() {
+                    try {
+                        JSONObject parseObject = new JSONObject();
+                        parseObject.put("node",request.getClientinfo().getNode());
+                        parseObject.put("clientid",request.getClientinfo().getClientid());
+                        parseObject.put("peerhost",request.getClientinfo().getClientid());
+                        parseObject.put("sockport",request.getClientinfo().getSockport());
+                        parseObject.put("protocol",request.getClientinfo().getProtocol());
+                        parseObject.put("anonymous",request.getClientinfo().getAnonymous());
+                        parseObject.put("username",request.getClientinfo().getUsername());
+                        parseObject.put("password",request.getClientinfo().getPassword());
+                        parseObject.put("timestamp",simpleDateFormat.format(new Date()));
+                        MongoUtils.save(parseObject,"MQTT_ClientAuthenticate");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            MessageLogs.start();
+
+            ValuedResponse reply = ValuedResponse.newBuilder()
+                                                 .setBoolResult(true)
+                                                 .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+                                                 .build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientAuthorize(ClientAuthorizeRequest request, StreamObserver<ValuedResponse> responseObserver) {
+            DEBUG("onClientAuthorize", request);
+            ValuedResponse reply = ValuedResponse.newBuilder()
+                                                 .setBoolResult(true)
+                                                 .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+                                                 .build();
+
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientSubscribe(ClientSubscribeRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientSubscribe", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onClientUnsubscribe(ClientUnsubscribeRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onClientUnsubscribe", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionCreated(SessionCreatedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionCreated", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionSubscribed(SessionSubscribedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionSubscribed", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionUnsubscribed(SessionUnsubscribedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionUnsubscribed", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionResumed(SessionResumedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionResumed", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionDiscarded(SessionDiscardedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionDdiscarded", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionTakenover(SessionTakenoverRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionTakenover", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onSessionTerminated(SessionTerminatedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onSessionTerminated", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
+            DEBUG("onMessagePublish", request);
+            MongoUtils.save(request);
+            Thread MessageLogs = new Thread() {
+                public void run() {
+                    try {
+                        JSONObject parseObject = new JSONObject();
+                        parseObject.put("node",request.getMessage().getNode());
+                        parseObject.put("message_id",request.getMessage().getId());
+                        parseObject.put("from",request.getMessage().getFrom());
+                        parseObject.put("payload",request.getMessage().getPayload().toString("UTF-8"));
+                        parseObject.put("topic",request.getMessage().getTopic());
+                        parseObject.put("headers",request.getMessage().getHeadersMap());
+                        parseObject.put("meta",request.getMeta().toString());
+                        parseObject.put("timestamp",simpleDateFormat.format(new Date(request.getMessage().getTimestamp())));
+                        MongoUtils.save(parseObject,"MQTT_MessagePublish");
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                }
+            };
+            MessageLogs.start();
+            /* toDoList
+             * 1、解密消息体
+             * 2、判断消息是否是合法消息
+             * 3、判断消息是否重复
+             * 4、判断是否是告警
+             * 5、发送到内部MQ
+             *
+             */
+
+            ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");
+
+            Message nmsg = Message.newBuilder()
+                                  .setId     (request.getMessage().getId())
+                                  .setNode   (request.getMessage().getNode())
+                                  .setFrom   (request.getMessage().getFrom())
+                                  .setTopic  (request.getMessage().getTopic())
+                                  .setPayload(bstr).build();
+
+
+            ValuedResponse reply = ValuedResponse.newBuilder()
+                                                 .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+                                                 .setMessage(nmsg).build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+// case2: stop publish the 't/d' messages
+//        @Override
+//        public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
+//            DEBUG("onMessagePublish", request);
+//
+//            Message nmsg = request.getMessage();
+//            if ("t/d".equals(nmsg.getTopic())) {
+//                ByteString bstr = ByteString.copyFromUtf8("");
+//                nmsg = Message.newBuilder()
+//                              .setId     (request.getMessage().getId())
+//                              .setNode   (request.getMessage().getNode())
+//                              .setFrom   (request.getMessage().getFrom())
+//                              .setTopic  (request.getMessage().getTopic())
+//                              .setPayload(bstr)
+//                              .putHeaders("allow_publish", "false").build();
+//            }
+//
+//            ValuedResponse reply = ValuedResponse.newBuilder()
+//                                                 .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+//                                                 .setMessage(nmsg).build();
+//            responseObserver.onNext(reply);
+//            responseObserver.onCompleted();
+//        }
+
+        @Override
+        public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onMessageDelivered", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onMessageAcked(MessageAckedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onMessageAcked", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void onMessageDropped(MessageDroppedRequest request, StreamObserver<EmptySuccess> responseObserver) {
+            DEBUG("onMessageDropped", request);
+            EmptySuccess reply = EmptySuccess.newBuilder().build();
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+    }
+}

+ 19 - 0
src/main/java/io/emqx/exhook/MongoDB/Collection.java

@@ -0,0 +1,19 @@
+package io.emqx.exhook.MongoDB;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @name Collection
+ * @description	定义集合
+ */
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface Collection {
+
+    String value() default "";
+
+}

+ 67 - 0
src/main/java/io/emqx/exhook/MongoDB/MongoEntity.java

@@ -0,0 +1,67 @@
+package io.emqx.exhook.MongoDB;
+
+import org.bson.types.ObjectId;
+import com.alibaba.fastjson.annotation.JSONField;
+
+/**
+ * @Description mongodb实体
+ */
+public class MongoEntity {
+    @JSONField(name="_id")
+    protected String id;
+
+    @JSONField(name="dataaepartid")
+    protected String dataDepartID;
+    @JSONField(name="datauserid")
+    protected String datauserid;
+
+    //查询处理部分,封装到bean中一起传递过来
+    public int pagesize;// 每页记录数
+    public int pageno;// 当前页页码,从1开始
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * @param id the id to set
+     */
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getDataDepartID() {
+        return dataDepartID;
+    }
+
+    public void setDataDepartID(String dataDepartID) {
+        this.dataDepartID = dataDepartID;
+    }
+
+    public String getDatauserid() {
+        return datauserid;
+    }
+
+    public void setDatauserid(String datauserid) {
+        this.datauserid = datauserid;
+    }
+
+    public int getPagesize() {
+        return pagesize;
+    }
+
+    public void setPagesize(int pagesize) {
+        this.pagesize = pagesize;
+    }
+
+    public int getPageno() {
+        return pageno;
+    }
+
+    public void setPageno(int pageno) {
+        this.pageno = pageno;
+    }
+}

+ 26 - 0
src/main/java/io/emqx/exhook/MongoDB/MongoLongConverter.java

@@ -0,0 +1,26 @@
+package io.emqx.exhook.MongoDB;
+
+import com.alibaba.fastjson.parser.DefaultJSONParser;
+import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer;
+import java.lang.reflect.Type;
+
+/*
+ * 处理反序列化是numberlong的问题,对象的字段上添加即可
+ * @JSONField(deserializeUsing =MongoLongConverter.class )
+ */
+public class MongoLongConverter implements ObjectDeserializer {
+
+    @Override
+    public Long deserialze(DefaultJSONParser parser, Type type, Object fieldName) {
+        String longStr = parser.parseObject().getString("$numberLong");
+        if (longStr!=null && !longStr.isEmpty()){
+            return Long.parseLong(longStr);
+        }
+        return 0L;
+    }
+
+    @Override
+    public int getFastMatchToken() {
+        return 0;
+    }
+}

+ 897 - 0
src/main/java/io/emqx/exhook/MongoDB/MongoSupport.java

@@ -0,0 +1,897 @@
+package io.emqx.exhook.MongoDB;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import io.emqx.exhook.MongoDB.paging.Page;
+import io.emqx.exhook.MongoDB.reflect.ReflectUtils;
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.*;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+/**
+ * @Description mongodb工具类
+ */
+public class MongoSupport {
+    public final static String ID = "_id";
+    private final static Map<String, MongoCollection<Document>> collections = new HashMap<>();
+
+    protected MongoDatabase db;
+
+    public MongoSupport(MongoClient mongo, String dbname) {
+        db = mongo.getDatabase(dbname);
+        if (db == null) {
+            throw new IllegalArgumentException("获取数据库实例失败:" + dbname);
+        }
+
+    }
+
+    public MongoDatabase getDB() {
+        return db;
+    }
+
+    public List<String> getCollectionNames() {
+        List<String> list = new ArrayList<String>();
+        MongoCursor<String> cursor = db.listCollectionNames().iterator();
+        while (cursor.hasNext()) {
+            list.add(cursor.next());
+        }
+        cursor.close();
+        return list;
+    }
+
+    /**
+     * 获取collection对象 - 指定Collection
+     *
+     * @param collectionName
+     * @return
+     */
+    public MongoCollection<Document> getCollection(String collectionName) {
+        MongoCollection<Document> coll = collections.get(collectionName);
+        if (coll == null) {
+            coll = db.getCollection(collectionName);
+            collections.put(collectionName, coll);
+        }
+        return coll;
+    }
+
+    protected String getCollectionName(Class<?> clazz) {
+        String collectionName = clazz.getSimpleName();
+        Collection coll = clazz.getAnnotation(Collection.class);
+        if (coll != null && coll.value() != null && !coll.value().isEmpty()) {
+            collectionName = coll.value();
+        }
+        return collectionName;
+    }
+
+    public List<Document> findAll(String collectionName) {
+        return find(collectionName, null, null, 0, 0);
+    }
+
+    public List<Document> findAll(String collectionName, Bson orderBy) {
+        return find(collectionName, null, orderBy, 0, 0);
+    }
+
+    public List<Document> findAll(String collectionName, Bson orderBy, int limit) {
+        return find(collectionName, null, orderBy, limit, 0);
+    }
+
+    public List<Document> findAll(String collectionName, int limit) {
+        return find(collectionName, null, null, limit, 0);
+    }
+
+    public List<Document> findAll(String collectionName, int limit, int skip) {
+        return find(collectionName, null, null, limit, skip);
+    }
+
+    /**
+     * 查询所有数据列表, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Map<String, Object>>
+     */
+    public List<Document> findAll(String collectionName, Bson orderBy, int limit, int skip) {
+        return find(collectionName, null, orderBy, limit, skip);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, null, null, 0, 0);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz, String collectionName) {
+        return find(clazz, collectionName, null, null, 0, 0);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy) {
+        return find(clazz, collectionName, null, orderBy, 0, 0);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy, int limit) {
+        return find(clazz, collectionName, null, orderBy, limit, 0);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz, String collectionName, int limit) {
+        return find(clazz, collectionName, null, null, limit, 0);
+    }
+
+    public <T> List<T> findAll(Class<T> clazz, String collectionName, int limit, int skip) {
+        return find(clazz, collectionName, null, null, limit, skip);
+    }
+
+    /**
+     * 查询所有数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<T>
+     */
+    public <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy, int limit, int skip) {
+        return find(clazz, collectionName, null, orderBy, limit, skip);
+    }
+
+    public List<Document> find(String collectionName, Bson filter) {
+        return find(collectionName, filter, null, 0, 0);
+    }
+
+    public List<Document> find(String collectionName, Bson filter, Bson orderBy) {
+        return find(collectionName, filter, orderBy, 0, 0);
+    }
+
+    public List<Document> find(String collectionName, Bson filter, Bson orderBy, int limit) {
+        return find(collectionName, filter, orderBy, 0, 0);
+    }
+
+    public List<Document> find(String collectionName, Bson filter, int limit) {
+        return find(collectionName, filter, null, 0, 0);
+    }
+
+    public List<Document> find(String collectionName, Bson filter, int limit, int skip) {
+        return find(collectionName, filter, null, 0, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Map<String, Object>>
+     */
+    public List<Document> find(String collectionName, Bson filter, Bson orderBy, int limit, int skip) {
+        FindIterable<Document> find = null;
+        if (filter == null) {
+            find = getCollection(collectionName).find();
+        } else {
+            find = getCollection(collectionName).find(filter);
+        }
+        if (orderBy != null) {
+            find.sort(orderBy);
+        }
+        if (skip > 0) {
+            find.skip(skip);
+        }
+        if (limit > 0) {
+            find.limit(limit);
+        }
+        MongoCursor<Document> cursor = find.iterator();
+        List<Document> list = new ArrayList<>();
+        try {
+            while (cursor.hasNext()) {
+                list.add(cursor.next());
+            }
+        } finally {
+            cursor.close();
+        }
+        return list;
+    }
+
+    public <T> List<T> find(Class<T> clazz, Bson filter) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, filter, null, 0, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, Bson filter, Bson orderBy) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, filter, orderBy, 0, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, Bson filter, Bson orderBy, int limit) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, filter, orderBy, limit, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, Bson filter, int limit) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, filter, null, limit, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, Bson filter, int limit, int skip) {
+        String collectionName = getCollectionName(clazz);
+        return find(clazz, collectionName, filter, null, limit, skip);
+    }
+
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter) {
+        return find(clazz, collectionName, filter, null, 0, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy) {
+        return find(clazz, collectionName, filter, orderBy, 0, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy, int limit) {
+        return find(clazz, collectionName, filter, orderBy, limit, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, int limit) {
+        return find(clazz, collectionName, filter, null, limit, 0);
+    }
+
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, int limit, int skip) {
+        return find(clazz, collectionName, filter, null, limit, skip);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<T>
+     */
+    public <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy, int limit, int skip) {
+        FindIterable<Document> find = null;
+        if (filter == null) {
+            find = getCollection(collectionName).find();
+        } else {
+            find = getCollection(collectionName).find(filter);
+        }
+        if (orderBy != null) {
+            find.sort(orderBy);
+        }
+        if (skip > 0) {
+            find.skip(skip);
+        }
+        if (limit > 0) {
+            find.limit(limit);
+        }
+        MongoCursor<Document> cursor = find.iterator();
+        List<T> list = new ArrayList<T>();
+        try {
+            while (cursor.hasNext()) {
+                Document doc = cursor.next();
+                ObjectId id = doc.getObjectId(ID);
+                if (id != null) {// 将id的值转成hexString
+                    doc.put(ID, id.toHexString());
+                }
+                String json = doc.toJson();
+                list.add(JSONObject.parseObject(json, clazz));
+            }
+        } finally {
+            cursor.close();
+        }
+        return list;
+    }
+
+    /**
+     * 查询数据列表(去重), 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param fieldName      要去重的字段
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return List<Map<String, Object>>
+     */
+    public List<Document> distinct(String collectionName, String fieldName, Bson filter) {
+        DistinctIterable<Document> find = null;
+        if (filter == null) {
+            find = getCollection(collectionName).distinct(fieldName, Document.class);
+        } else {
+            find = getCollection(collectionName).distinct(fieldName, filter, Document.class);
+        }
+        MongoCursor<Document> cursor = find.iterator();
+        List<Document> list = new ArrayList<>();
+        try {
+            while (cursor.hasNext()) {
+                list.add(cursor.next());
+            }
+        } finally {
+            cursor.close();
+        }
+        return list;
+    }
+
+    public Document findOne(String collectionName, Bson filter) {
+        MongoCursor<Document> cursor = getCollection(collectionName).find(filter).iterator();
+        try {
+            if (cursor.hasNext()) {
+                return cursor.next();
+            }
+        } finally {
+            cursor.close();
+        }
+        return null;
+    }
+
+    public <T> T findOne(Class<T> clazz, Bson filter) {
+        String collectionName = getCollectionName(clazz);
+        return findOne(clazz, collectionName, filter);
+    }
+
+    public <T> T findOne(Class<T> clazz, String collectionName, Bson filter) {
+        MongoCursor<Document> cursor = getCollection(collectionName).find(filter).iterator();
+        try {
+            if (cursor.hasNext()) {
+                Document doc = cursor.next();
+                ObjectId id = doc.getObjectId(ID);
+                if (id != null) {// 将id的值转成hexString
+                    doc.put(ID, id.toHexString());
+                }
+                String json = doc.toJson();
+                return JSONObject.parseObject(json, clazz);
+            }
+        } finally {
+            cursor.close();
+        }
+        return null;
+    }
+
+    public Page findPage(Page page) {
+        String collectionName = getCollectionName(page.getClazz());
+        return findPage(page, collectionName, null, null);
+    }
+
+    public Page findPage(Page page, String collectionName) {
+        return findPage(page, collectionName, null, null);
+    }
+
+    public Page findPage(Page page, Bson filter) {
+        String collectionName = getCollectionName(page.getClazz());
+        return findPage(page, collectionName, filter, null);
+    }
+
+    public Page findPage(Page page, String collectionName, Bson filter) {
+        return findPage(page, collectionName, filter, null);
+    }
+
+    public Page findPage(Page page, Bson filter, Bson orderBy) {
+        String collectionName = getCollectionName(page.getClazz());
+        return findPage(page, collectionName, filter, orderBy);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page           分页设置对象
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return
+     */
+    public Page findPage(Page page, String collectionName, Bson filter, Bson orderBy) {
+        if (page.getPageSize() <= 0) {
+            page.reset();
+            return page;
+        }
+
+        if (page.getTotalCount() == 0) {
+            long total = count(collectionName, filter);
+            if (total == 0) {
+                page.reset();
+                return page;
+            }
+            page.setTotalCount(total);
+        }
+
+        if (page.getPageNo() < 1) {
+            page.setPageNo(1);
+        } else if (page.getPageCount() > 0 && page.getPageNo() > page.getPageCount()) {
+            page.setPageNo(page.getPageCount());
+        }
+        FindIterable<Document> find = null;
+        if (filter == null) {
+            find = getCollection(collectionName).find();
+        } else {
+            find = getCollection(collectionName).find(filter);
+        }
+        if (orderBy != null) {
+            find.sort(orderBy);
+        }
+        find.skip((page.getPageNo() - 1) * page.getPageSize()).limit(page.getPageSize());
+        MongoCursor<Document> cursor = find.iterator();
+        List list = new ArrayList<>();
+        try {
+            boolean isDoc = page.getClazz().isAssignableFrom(Document.class);
+            while (cursor.hasNext()) {
+                Document doc = cursor.next();
+                if (isDoc) {
+                    list.add(doc);
+                } else {
+                    ObjectId id = doc.getObjectId(ID);
+                    if (id != null) {// 将id的值转成hexString
+                        doc.put(ID, id.toHexString());
+                    }
+                    String json = doc.toJson();
+                    list.add(JSONObject.parseObject(json, page.getClazz()));
+                }
+            }
+        } finally {
+            cursor.close();
+        }
+        page.setList(list);
+
+        if (page.getPageNo() == 1) {
+            page.setPrevPageNo(0);
+        } else {
+            page.setPrevPageNo(page.getPageNo() - 1);
+        }
+        if (page.getPageNo() == page.getPageCount()) {
+            page.setNextPageNo(0);
+        } else {
+            page.setNextPageNo(page.getPageNo() + 1);
+        }
+        return page;
+    }
+
+    public <T> T findById(Class<T> clazz, String id) {
+        String collectionName = getCollectionName(clazz);
+        return findById(clazz, collectionName, id);
+    }
+
+    public <T> T findById(Class<T> clazz, String collectionName, String id) {
+        Bson filter = Filters.eq(ID, new ObjectId(id));
+        MongoCursor<Document> cursor = getCollection(collectionName).find(filter).iterator();
+        try {
+            if (cursor.hasNext()) {
+                Document doc = cursor.next();
+                ObjectId _id = doc.getObjectId(ID);
+                if (_id != null) {// 将id的值转成hexString
+                    doc.put(ID, _id.toHexString());
+                }
+                String json = doc.toJson();
+                return JSONObject.parseObject(json, clazz);
+            }
+        } finally {
+            cursor.close();
+        }
+        return null;
+    }
+
+    public Document findById(String collectionName, String id) {
+        Bson filter = Filters.eq(ID, new ObjectId(id));
+        MongoCursor<Document> cursor = getCollection(collectionName).find(filter).iterator();
+        try {
+            if (cursor.hasNext()) {
+                return cursor.next();
+            }
+        } finally {
+            cursor.close();
+        }
+        return null;
+    }
+
+    public <T> long count(Class<T> clazz) {
+        String collectionName = getCollectionName(clazz);
+        return count(collectionName);
+    }
+
+    public long count(String collectionName) {
+        return getCollection(collectionName).countDocuments();
+    }
+
+    public <T> long count(Class<T> clazz, Bson filter) {
+        String collectionName = getCollectionName(clazz);
+        return count(collectionName, filter);
+    }
+
+    public long count(String collectionName, Bson filter) {
+        if (filter == null) {
+            return getCollection(collectionName).countDocuments();
+        }
+        return getCollection(collectionName).countDocuments(filter);
+    }
+
+    public List<Document> count(String collectionName, String[] groupBy) {
+        return count(collectionName, groupBy, null, 0);
+    }
+
+    public List<Document> count(String collectionName, String[] groupBy, Bson filter) {
+        return count(collectionName, groupBy, filter, 0);
+    }
+
+    public List<Document> count(String collectionName, String[] groupBy, Bson filter, int limit) {
+        StringBuilder mapFunction = new StringBuilder("function(){emit(");
+        int len = groupBy.length;
+        if (len == 1) {
+            mapFunction.append("this.").append(groupBy[0]);
+        } else {
+            mapFunction.append("{");
+            for (int i = 0; i < len; i++) {
+                if (i > 0) {
+                    mapFunction.append(",");
+                }
+                mapFunction.append(groupBy[i]).append(":this.").append(groupBy[i]);
+            }
+            mapFunction.append("}");
+        }
+        mapFunction.append(",1");
+        mapFunction.append(");}");
+        StringBuilder reduceFunction = new StringBuilder("function(key, values){");
+        reduceFunction.append("var total = 0;");
+        reduceFunction.append("values.forEach(function(val){total += val;});");
+        reduceFunction.append("return total;");
+        reduceFunction.append("}");
+        MapReduceIterable<Document> find = getCollection(collectionName).mapReduce(mapFunction.toString(), reduceFunction.toString());
+        if (filter != null) {
+            find.filter(filter);
+        }
+        if (limit > 0) {
+            find.limit(limit);
+        }
+        find.jsMode(true);
+        MongoCursor<Document> cursor = find.iterator();
+        List<Document> list = new ArrayList<Document>();
+        try {
+            while (cursor.hasNext()) {
+                Document doc = cursor.next();
+                if (len == 1) {
+                    doc.put(groupBy[0], doc.get("_id"));
+                } else {
+                    doc.putAll((Document) doc.get("_id"));
+                }
+                doc.remove("_id");
+
+                Object val = doc.get("value");
+                if (val instanceof List) {
+                    val = ((List) val).get(0);
+                }
+                long count = 0;
+                if (val instanceof Number) {
+                    count = ((Number)val).longValue();
+                } else {
+                    System.out.println("{"+val+"} is not a number!!! doc={"+doc+"}");
+                }
+                doc.remove("value");
+                doc.put("count", count);
+                list.add(doc);
+            }
+        } finally {
+            cursor.close();
+        }
+        return list;
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, null, null, null, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, null, filter, null, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter, Bson orderBy) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, null, filter, orderBy, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter, int limit) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, null, filter, null, limit);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, finalizeFunction, null, null, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, finalizeFunction, filter, null, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter, Bson orderBy) {
+        return mapReduce(collectionName, mapFunction, reduceFunction, finalizeFunction, filter, orderBy, 0);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter, int limit) {
+        return mapReduce(Document.class, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, null, limit);
+    }
+
+    public List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter, Bson orderBy, int
+            limit) {
+        return mapReduce(Document.class, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, orderBy, limit);
+    }
+
+    public <T> List<T> mapReduce(Class<T> resultClass, String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson
+            filter, int limit) {
+        return mapReduce(resultClass, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, null, limit);
+    }
+
+    public <T> List<T> mapReduce(Class<T> resultClass, String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson
+            filter, Bson orderBy, int limit) {
+        MapReduceIterable<T> find = getCollection(collectionName).mapReduce(mapFunction, reduceFunction, resultClass);
+        if (filter != null) {
+            find.filter(filter);
+        }
+        if (finalizeFunction != null && !finalizeFunction.isEmpty()) {
+            find.finalizeFunction(finalizeFunction);
+        }
+        if (orderBy != null) {
+            find.sort(orderBy);
+        }
+        if (limit > 0) {
+            find.limit(limit);
+        }
+        find.jsMode(true);
+        MongoCursor<T> cursor = find.iterator();
+        List<T> list = new ArrayList<T>();
+        try {
+            while (cursor.hasNext()) {
+                list.add(cursor.next());
+            }
+        } finally {
+            cursor.close();
+        }
+        return list;
+    }
+
+    public void save(Object entity) {
+        Class<?> clazz = entity.getClass();
+        String collectionName = getCollectionName(clazz);
+        save(entity, collectionName);
+    }
+
+    public void save(Object entity, String collectionName) {
+        if (entity instanceof Document) {
+            Document data = (Document) entity;
+            data.remove("pagesize");
+            data.remove("pageno");
+            getCollection(collectionName).insertOne(data);
+        } else if (entity instanceof Map) {
+            Map data = (Map<String, Object>) entity;
+            data.remove("pagesize");
+            data.remove("pageno");
+            getCollection(collectionName).insertOne(new Document(data));
+        } else {
+            JSONObject json = JSONObject.parseObject(toJson(entity));
+            json.remove("pagesize");
+            json.remove("pageno");
+            getCollection(collectionName).insertOne(Document.parse(json.toJSONString()));
+        }
+    }
+
+    public void saveAll(List<?> list) {
+        Class<?> clazz = list.get(0).getClass();
+        String collectionName = getCollectionName(clazz);
+        saveAll(list, collectionName);
+    }
+
+    public void saveAll(List<?> list, String collectionName) {
+        List<Document> docList = new ArrayList<Document>();
+        for (Object obj : list) {
+            if (obj instanceof Document) {
+                docList.add((Document) obj);
+            } else if (obj instanceof Map) {
+                docList.add(new Document((Map<String, Object>) obj));
+            } else {
+                String json = toJson(obj);
+                docList.add(Document.parse(json));
+            }
+        }
+        getCollection(collectionName).insertMany(docList);
+    }
+
+    public <T> boolean update(T entity) {
+        String collectionName = getCollectionName(entity.getClass());
+        return update(entity, collectionName);
+    }
+
+    public <T> boolean update(T entity, String collectionName) {
+        String id = null;
+        String methodName = "getId";//默认id字估getter方法名
+        try {
+            id = (String) (entity.getClass().getMethod(methodName).invoke(entity));
+        } catch (NoSuchMethodException ex) {
+            List<Field> fields = ReflectUtils.getFields(entity.getClass());
+            for (Field field : fields) {
+                JSONField jf = field.getAnnotation(JSONField.class);
+                if (jf != null && ID.equals(jf.name())) {
+                    methodName = "get" + field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);
+                    try {
+                        id = (String) (entity.getClass().getMethod(methodName).invoke(entity));
+                    } catch (IllegalAccessException| InvocationTargetException | NoSuchMethodException e) {
+                        e.printStackTrace();
+                        return false;
+                    }
+                    break;
+                }
+            }
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            e.printStackTrace();
+            return false;
+        }
+        return updateById(entity, collectionName, id);
+    }
+
+    public <T> long update(T entity, Bson filter) {
+        String collectionName = getCollectionName(entity.getClass());
+        return update(entity, collectionName, filter, false);
+    }
+
+    public <T> long update(T entity, Bson filter, boolean mutil) {
+        String collectionName = getCollectionName(entity.getClass());
+        return update(entity, collectionName, filter, mutil);
+    }
+
+    public <T> long update(T entity, String collectionName, Bson filter, boolean mutil) {
+        String json = toJson(entity);
+        BasicDBObject update = BasicDBObject.parse(json);
+        return update(collectionName, filter, new BasicDBObject("$set", update), mutil);
+    }
+
+    /**
+     * 更新数据(updateOne)
+     *
+     * @param collectionName
+     * @param filter
+     * @param update
+     * @return
+     */
+    public long update(String collectionName, Bson filter, Update update) {
+        return update(collectionName, filter, update, false);
+    }
+
+    /**
+     * 更新集合
+     *
+     * @param collectionName 集合名称
+     * @param filter         更新条件,如:Filters.eq("key", "value")
+     * @param update         更新字段
+     *                       ,如:new Update().set("key", "value")
+     * @param mutil          true-更新所有
+     * @return
+     */
+    public long update(String collectionName, Bson filter, Update update, boolean mutil) {
+        BasicDBObject _update = new BasicDBObject();
+        if (!update.inc().isEmpty()) {
+            _update.append("$inc", update.inc());
+        }
+        if (!update.set().isEmpty()) {
+            _update.append("$set", update.set());
+        }
+        if (!update.unset().isEmpty()) {
+            _update.append("$unset", update.unset());
+        }
+        return update(collectionName, filter, _update, mutil);
+    }
+
+    public long update(String collectionName, Bson filter, Bson update, boolean mutil) {
+        UpdateResult result = null;
+        if (mutil) {
+            result = getCollection(collectionName).updateMany(filter, update);
+        } else {
+            result = getCollection(collectionName).updateOne(filter, update);
+        }
+        if (result.wasAcknowledged()) {
+            return result.getModifiedCount();
+        }
+        return -1;
+    }
+
+    public <T> boolean updateById(T entity, String id) {
+        String collectionName = getCollectionName(entity.getClass());
+        return updateById(entity, collectionName, id);
+    }
+
+    public <T> boolean updateById(T entity, String collectionName, String id) {
+        if (id == null || id.isEmpty()) {
+            throw new IllegalArgumentException("id不能为空");
+        }
+        JSONObject json = JSONObject.parseObject(toJson(entity));
+        json.put("pagesize",null);
+        json.put("pageno",null);
+        json.put("_id",null);
+        Bson filter = Filters.eq(ID, new ObjectId(id));
+        BasicDBObject update = BasicDBObject.parse(json.toJSONString());
+        UpdateResult result = getCollection(collectionName).updateOne(filter, new BasicDBObject("$set", update));
+        if (result.wasAcknowledged()) {
+            if (result.getModifiedCount() > 0) {
+                return true;
+            }
+        } else {
+            return true;
+        }
+        return false;
+    }
+
+    public boolean updateById(String collectionName, Update update, String id) {
+        BasicDBObject _update = new BasicDBObject();
+        if (!update.inc().isEmpty()) {
+            _update.append("$inc", update.inc());
+        }
+        if (!update.set().isEmpty()) {
+            _update.append("$set", update.set());
+        }
+        if (!update.unset().isEmpty()) {
+            _update.append("$unset", update.unset());
+        }
+        return updateById(collectionName, _update, id);
+    }
+
+    public boolean updateById(String collectionName, Bson update, String id) {
+        if (id == null || id.isEmpty()) {
+            throw new IllegalArgumentException("id不能为空");
+        }
+        Bson filter = Filters.eq(ID, new ObjectId(id));
+        UpdateResult result = getCollection(collectionName).updateOne(filter, update);
+        if (result.wasAcknowledged()) {
+            return (result.getModifiedCount() > 0);
+        }
+        return true;
+    }
+
+    public boolean deleteOne(String collectionName, Bson filter) {
+        DeleteResult result = getCollection(collectionName).deleteOne(filter);
+        if (result.wasAcknowledged()) {
+            return (result.getDeletedCount() > 0);
+        }
+        return true;
+    }
+
+    public long deleteAll(String collectionName, Bson filter) {
+        DeleteResult result = getCollection(collectionName).deleteMany(filter);
+        if (result.wasAcknowledged()) {
+            return result.getDeletedCount();
+        }
+        return 0;
+    }
+
+    public boolean deleteById(String collectionName, String id) {
+        return deleteOne(collectionName, Filters.eq(ID, new ObjectId(id)));
+    }
+
+    public void drop(String collectionName) {
+        getCollection(collectionName).drop();
+    }
+
+    public void createIndex(String collectionName, Bson keys) {
+        getCollection(collectionName).createIndex(keys);
+    }
+
+    public void dropIndexes(String collectionName) {
+        getCollection(collectionName).dropIndexes();
+    }
+
+    public void dropIndex(String collectionName, String indexName) {
+        getCollection(collectionName).dropIndex(indexName);
+    }
+
+    protected String toJson(Object obj) {
+        int features = 0;
+        features |= SerializerFeature.QuoteFieldNames.getMask();
+        features |= SerializerFeature.SkipTransientField.getMask();
+        features |= SerializerFeature.WriteEnumUsingName.getMask();
+        features |= SerializerFeature.SortField.getMask();
+        features |= SerializerFeature.IgnoreNonFieldGetter.getMask();
+        // features |= SerializerFeature.WriteMapNullValue.getMask();
+        // features |= SerializerFeature.WriteNullBooleanAsFalse.getMask();
+        // features |= SerializerFeature.WriteNullListAsEmpty.getMask();
+        // features |= SerializerFeature.WriteNullNumberAsZero.getMask();
+        // features |= SerializerFeature.WriteNullStringAsEmpty.getMask();
+        return JSONObject.toJSONString(obj, features);
+    }
+}

+ 948 - 0
src/main/java/io/emqx/exhook/MongoDB/MongoUtils.java

@@ -0,0 +1,948 @@
+package io.emqx.exhook.MongoDB;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import io.emqx.exhook.MongoDB.paging.Page;
+import io.emqx.exhook.MongoDB.reflect.ReflectUtils;
+import com.mongodb.*;
+import com.mongodb.client.model.Filters;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import com.mongodb.client.MongoCollection;
+import org.bson.types.ObjectId;
+
+/**
+ * @Description mongodb工具类
+ */
+public class MongoUtils {
+    private static MongoSupport mongoSupport;
+
+    private static MongoClient mongoClient;
+    private static String DBName = "test";
+
+    static {
+        //String mongoClientURI = "mongodb://ziyang:ziyang12345678@" + "124.221.81.119" + ":" + 27017 + "/" + DBName;
+        String mongoClientURI = "mongodb://" + "192.168.31.136" + ":" + 27017 + "/" + DBName;
+        ServerApi serverApi = ServerApi.builder()
+                .version(ServerApiVersion.V1)
+                .build();
+        MongoClientSettings settings = MongoClientSettings.builder()
+                .applyConnectionString(new ConnectionString(mongoClientURI))
+                .serverApi(serverApi)
+                .build();
+        mongoClient =  MongoClients.create(settings);
+    }
+
+
+
+    public final static MongoSupport getMongoSupport() {
+        if (mongoSupport == null) {
+            mongoSupport = new MongoSupport(mongoClient,DBName);
+        }
+        return mongoSupport;
+    }
+
+    /**
+     * 获取所有集合名称
+     *
+     * @return List<String>
+     */
+    public final static List<String> getCollectionNames() {
+        return getMongoSupport().getCollectionNames();
+    }
+
+    /**
+     * 获取指定集合
+     *
+     * @param collectionName
+     * @return
+     */
+    public final static MongoCollection<Document> getCollection(String collectionName) {
+        return getMongoSupport().getCollection(collectionName);
+    }
+
+    /**
+     * 查询所有数据, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName) {
+        return getMongoSupport().find(collectionName, null, null, 0, 0);
+    }
+
+    /**
+     * 查询所有数据, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param orderBy        排序条件,如:new BasicDBObject("key", OrderBy.ASC)
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName, Bson orderBy) {
+        return getMongoSupport().find(collectionName, null, orderBy, 0, 0);
+    }
+
+    /**
+     * 查询指定数量数据, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param orderBy        排序条件,如:new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName, Bson orderBy, int limit) {
+        return getMongoSupport().find(collectionName, null, orderBy, limit, 0);
+    }
+
+    /**
+     * 查询指定数量数据, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param limit          查询记录数(0-所有)
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName, int limit) {
+        return getMongoSupport().find(collectionName, null, null, limit, 0);
+    }
+
+    /**
+     * 查询指定数量数据, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName, int limit, int skip) {
+        return getMongoSupport().find(collectionName, null, null, limit, skip);
+    }
+
+    /**
+     * 查询所有数据列表, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Document>
+     */
+    public final static List<Document> findAll(String collectionName, Bson orderBy, int limit, int skip) {
+        return getMongoSupport().find(collectionName, null, orderBy, limit, skip);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz 实体类,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz) {
+        return getMongoSupport().findAll(clazz);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName) {
+        return getMongoSupport().find(clazz, collectionName, null, null, 0, 0);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy) {
+        return getMongoSupport().find(clazz, collectionName, null, orderBy, 0, 0);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy, int limit) {
+        return getMongoSupport().find(clazz, collectionName, null, orderBy, limit, 0);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名
+     * @param limit          查询记录数(0-所有)
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName, int limit) {
+        return getMongoSupport().find(clazz, collectionName, null, null, limit, 0);
+    }
+
+    /**
+     * 查询所有数据列表,返回指定实体类列表
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName, int limit, int skip) {
+        return getMongoSupport().find(clazz, collectionName, null, null, limit, skip);
+    }
+
+    /**
+     * 查询所有数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<T>
+     */
+    public final static <T> List<T> findAll(Class<T> clazz, String collectionName, Bson orderBy, int limit, int skip) {
+        return getMongoSupport().find(clazz, collectionName, null, orderBy, limit, skip);
+    }
+
+    /**
+     * 根据条件查询数据列表
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件,如:Filters.eq("key","value")
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter) {
+        return getMongoSupport().find(collectionName, filter, null, 0, 0);
+    }
+
+    /**
+     * 根据条件查询数据列表
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件,如:Filters.eq("key","value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter, Bson orderBy) {
+        return getMongoSupport().find(collectionName, filter, orderBy, 0, 0);
+    }
+
+    /**
+     * 根据条件查询数据列表
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件,如:Filters.eq("key","value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter, Bson orderBy, int limit) {
+        return getMongoSupport().find(collectionName, filter, orderBy, 0, 0);
+    }
+
+    /**
+     * 根据条件查询数据列表
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件,如:Filters.eq("key","value")
+     * @param limit          查询记录数(0-所有)
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter, int limit) {
+        return getMongoSupport().find(collectionName, filter, null, 0, 0);
+    }
+
+    /**
+     * 根据条件查询数据列表
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件,如:Filters.eq("key","value")
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter, int limit, int skip) {
+        return getMongoSupport().find(collectionName, filter, null, 0, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<Document>
+     */
+    public final static List<Document> find(String collectionName, Bson filter, Bson orderBy, int limit, int skip) {
+        return getMongoSupport().find(collectionName, filter, orderBy, limit, skip);
+    }
+
+    /**
+     * 查询数据列表(去重), 返回Map结果集
+     *
+     * @param collectionName 集合名称
+     * @param fieldName      要去重的字段
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return List<Map<String, Object>>
+     */
+    public List<Document> distinct(String collectionName, String fieldName, Bson filter) {
+        return getMongoSupport().distinct(collectionName, fieldName, filter);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz  Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, Bson filter) {
+        return getMongoSupport().find(clazz, filter);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz   Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter  查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy 排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, Bson filter, Bson orderBy) {
+        return getMongoSupport().find(clazz, filter, orderBy);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz   Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter  查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy 排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit   查询记录数(0-所有)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, Bson filter, Bson orderBy, int limit) {
+        return getMongoSupport().find(clazz, filter, orderBy, limit);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz  Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @param limit  查询记录数(0-所有)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, Bson filter, int limit) {
+        return getMongoSupport().find(clazz, filter, limit);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz  Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @param limit  查询记录数(0-所有)
+     * @param skip   跳过记录数
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, Bson filter, int limit, int skip) {
+        return getMongoSupport().find(clazz, filter, limit, skip);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter) {
+        return getMongoSupport().find(clazz, collectionName, filter, null, 0, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy) {
+        return getMongoSupport().find(clazz, collectionName, filter, orderBy, 0, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy, int limit) {
+        return getMongoSupport().find(clazz, collectionName, filter, orderBy, limit, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param limit          查询记录数(0-所有)
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, int limit) {
+        return getMongoSupport().find(clazz, collectionName, filter, null, limit, 0);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, int limit, int skip) {
+        return getMongoSupport().find(clazz, collectionName, filter, null, limit, skip);
+    }
+
+    /**
+     * 查询数据列表, 返回Java实体对象结果集
+     *
+     * @param clazz          Java实体对象, 如: User.class
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @param limit          查询记录数(0-所有)
+     * @param skip           跳过记录数
+     * @return List<T>
+     */
+    public final static <T> List<T> find(Class<T> clazz, String collectionName, Bson filter, Bson orderBy, int limit, int skip) {
+        return getMongoSupport().find(clazz, collectionName, filter, orderBy, limit, skip);
+    }
+
+    /**
+     * 查询单条记录, 返回Map对象
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return Document
+     */
+    public final static Document findOne(String collectionName, Bson filter) {
+        return getMongoSupport().findOne(collectionName, filter);
+    }
+
+    /**
+     * 查询单条记录, 返回Java实体对象
+     *
+     * @param clazz  Java实体对象, 集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @return <T>
+     */
+    public final static <T> T findOne(Class<T> clazz, Bson filter) {
+        return getMongoSupport().findOne(clazz, filter);
+    }
+
+    /**
+     * 查询单条记录, 返回Java实体对象
+     *
+     * @param clazz          Java实体对象
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return <T>
+     */
+    public final static <T> T findOne(Class<T> clazz, String collectionName, Bson filter) {
+        return getMongoSupport().findOne(clazz, collectionName, filter);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page 分页设置对象,必须设置clazz属性,集合名与clazz对象类名不相同时需要配置注解@Collection(
+     *             "collectionName")
+     * @return Page
+     */
+    public final static Page findPage(Page page) {
+        return getMongoSupport().findPage(page);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page           分页设置对象
+     * @param collectionName 集合名称
+     * @return Page
+     */
+    public final static Page findPage(Page page, String collectionName) {
+        return getMongoSupport().findPage(page, collectionName, null, null);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page   分页设置对象,必须设置clazz属性,集合名与clazz对象类名不相同时需要配置注解@Collection(
+     *               "collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @return Page
+     */
+    public final static Page findPage(Page page, Bson filter) {
+        return getMongoSupport().findPage(page, filter);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page           分页设置对象
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return Page
+     */
+    public final static Page findPage(Page page, String collectionName, Bson filter) {
+        return getMongoSupport().findPage(page, collectionName, filter, null);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page    分页设置对象,必须设置clazz属性,集合名与clazz对象类名不相同时需要配置注解@Collection(
+     *                "collectionName")
+     * @param filter  查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy 排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return Page
+     */
+    public final static Page findPage(Page page, Bson filter, Bson orderBy) {
+        return getMongoSupport().findPage(page, filter, orderBy);
+    }
+
+    /**
+     * 分页查询
+     *
+     * @param page           分页设置对象
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param orderBy        排序, 如: new BasicDBObject("key", OrderBy.ASC)
+     * @return Page
+     */
+    public final static Page findPage(Page page, String collectionName, Bson filter, Bson orderBy) {
+        return getMongoSupport().findPage(page, collectionName, filter, orderBy);
+    }
+
+    /**
+     * 通过_id查询记录
+     *
+     * @param clazz 实体类,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param id
+     * @return <T>
+     */
+    public final static <T> T findById(Class<T> clazz, String id) {
+        return getMongoSupport().findById(clazz, id);
+    }
+
+    /**
+     * 通过_id查询记录
+     *
+     * @param clazz          实体类
+     * @param collectionName 集合名称
+     * @param id
+     * @return <T>
+     */
+    public final static <T> T findById(Class<T> clazz, String collectionName, String id) {
+        return getMongoSupport().findById(clazz, collectionName, id);
+    }
+
+    /**
+     * 通过_id查询记录,返回Map
+     *
+     * @param collectionName 集合名称
+     * @param id
+     * @return Document
+     */
+    public final static Document findById(String collectionName, String id) {
+        return getMongoSupport().findById(collectionName, id);
+    }
+
+    /**
+     * 查询集合记录数
+     *
+     * @param clazz 实体类,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @return long
+     */
+    public final static <T> long count(Class<T> clazz) {
+        return getMongoSupport().count(clazz);
+    }
+
+    /**
+     * 查询集合记录数
+     *
+     * @param collectionName 集合名称
+     * @return long
+     */
+    public final static long count(String collectionName) {
+        return getMongoSupport().count(collectionName);
+    }
+
+    /**
+     * 查询集合记录数
+     *
+     * @param clazz  实体类,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param filter 查询条件, 如: Filters.eq("key", "value")
+     * @return long
+     */
+    public final static <T> long count(Class<T> clazz, Bson filter) {
+        return getMongoSupport().count(clazz, filter);
+    }
+
+    /**
+     * 查询集合记录数
+     *
+     * @param collectionName 集合名称
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return long
+     */
+    public final static long count(String collectionName, Bson filter) {
+        return getMongoSupport().count(collectionName, filter);
+    }
+
+    /**
+     * 分组统计记录数
+     *
+     * @param collectionName 集合名称
+     * @param groupBy        分组字段
+     * @return
+     */
+    public final static List<Document> count(String collectionName, String[] groupBy) {
+        return getMongoSupport().count(collectionName, groupBy, null, 0);
+    }
+
+    /**
+     * 分组统计记录数
+     *
+     * @param collectionName 集合名称
+     * @param groupBy        分组字段
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @return
+     */
+    public final static List<Document> count(String collectionName, String[] groupBy, Bson filter) {
+        return getMongoSupport().count(collectionName, groupBy, filter, 0);
+    }
+
+    /**
+     * 分组统计记录数
+     *
+     * @param collectionName 集合名称
+     * @param groupBy        分组字段
+     * @param filter         查询条件, 如: Filters.eq("key", "value")
+     * @param limit          数据集合大小限制
+     * @return
+     */
+    public final static List<Document> count(String collectionName, String[] groupBy, Bson filter, int limit) {
+        return getMongoSupport().count(collectionName, groupBy, filter, limit);
+    }
+
+    /**
+     * 使用mapReduce进行统计
+     *
+     * @param collectionName
+     * @param mapFunction
+     * @param reduceFunction
+     * @return
+     */
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, null, null, null, 0);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, null, filter, null, 0);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter, Bson orderBy) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, null, filter, orderBy, 0);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter, int limit) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, null, filter, null, limit);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, Bson filter, Bson orderBy, int limit) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, null, filter, orderBy, limit);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, finalizeFunction, null, null, 0);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, null, 0);
+    }
+
+    public final static List<Document> mapReduce(String collectionName, String mapFunction, String reduceFunction, String finalizeFunction, Bson filter, int
+            limit) {
+        return getMongoSupport().mapReduce(Document.class, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, null, limit);
+    }
+
+    /**
+     * 保存对象
+     *
+     * @param entity Map或实体类对象,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     */
+    public final static void save(Object entity) {
+        getMongoSupport().save(entity);
+    }
+
+    /**
+     * 保存对象
+     *
+     * @param entity         Map或实体类对象
+     * @param collectionName 集合名称
+     */
+    public final static void save(Object entity, String collectionName) {
+        getMongoSupport().save(entity, collectionName);
+    }
+
+    /**
+     * 批量保存对象
+     *
+     * @param list Map或实体类对象列表,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     */
+    public final static void saveAll(List<?> list) {
+        getMongoSupport().saveAll(list);
+    }
+
+    /**
+     * 批量保存对象
+     *
+     * @param list           Map或实体类对象列表
+     * @param collectionName 集合名称
+     */
+    public final static void saveAll(List<?> list, String collectionName) {
+        getMongoSupport().saveAll(list, collectionName);
+    }
+
+    /**
+     * 更新记录
+     *
+     * @param entity 实体类对象列表,必须设置_id属性的值,集合名与类名不相同时需要配置注解@Collection(
+     *               "collectionName")
+     * @return boolean
+     */
+    public final static <T> boolean update(T entity) {
+        return getMongoSupport().update(entity);
+    }
+
+    /**
+     * 更新记录
+     *
+     * @param entity         实体类对象列表,必须设置_id属性的值
+     * @param collectionName 集合名称
+     * @return boolean
+     */
+    public final static <T> boolean update(T entity, String collectionName) {
+        return getMongoSupport().update(entity, collectionName);
+    }
+
+    /**
+     * 更新单条记录(UpdateOne)
+     *
+     * @param entity 实体类对象
+     * @param filter 更新条件, 如: Filters.eq("key", "value")
+     * @return long 更新记录数
+     */
+    public final static <T> long update(T entity, Bson filter) {
+        return getMongoSupport().update(entity, filter, false);
+    }
+
+    /**
+     * 更新记录
+     *
+     * @param entity 实体类对象
+     * @param filter 更新条件, 如: Filters.eq("key", "value")
+     * @param mutil  true-更新所有,false-更新一条
+     * @return long 更新记录数
+     */
+    public final static <T> long update(T entity, Bson filter, boolean mutil) {
+        return getMongoSupport().update(entity, filter, mutil);
+    }
+
+    /**
+     * 更新单条记录(UpdateOne)
+     *
+     * @param collectionName 集合名称
+     * @param filter         更新条件, 如: Filters.eq("key", "value")
+     * @param update         更新数据
+     * @return long 更新记录数
+     */
+    public final static long update(String collectionName, Bson filter, Update update) {
+        return getMongoSupport().update(collectionName, filter, update, false);
+    }
+
+    /**
+     * 更新记录
+     *
+     * @param collectionName 集合名称
+     * @param filter         更新条件, 如: Filters.eq("key", "value")
+     * @param update         更新数据
+     * @param mutil          true-更新所有,false-更新一条
+     * @return long 更新记录数
+     */
+    public final static long update(String collectionName, Bson filter, Update update, boolean mutil) {
+        return getMongoSupport().update(collectionName, filter, update, mutil);
+    }
+
+    /**
+     * 通过_id更新记录
+     *
+     * @param entity 实体类对象,集合名与类名不相同时需要配置注解@Collection("collectionName")
+     * @param id
+     * @return boolean
+     */
+    public final static <T> boolean updateById(T entity, String id) {
+        return getMongoSupport().updateById(entity, id);
+    }
+
+    /**
+     * 通过_id更新记录
+     *
+     * @param entity         实体类对象
+     * @param collectionName 集合名称
+     * @param id
+     * @return boolean
+     */
+    public final static <T> boolean updateById(T entity, String collectionName, String id) {
+        return getMongoSupport().updateById(entity, collectionName, id);
+    }
+
+    /**
+     * 通过_id更新记录
+     *
+     * @param collectionName 集合名称
+     * @param update         更新数据
+     * @param id
+     * @return boolean
+     */
+    public final static boolean updateById(String collectionName, Update update, String id) {
+        return getMongoSupport().updateById(collectionName, update, id);
+    }
+
+    /**
+     * 通过_id更新记录
+     *
+     * @param collectionName 集合名称
+     * @param update         更新数据(Bson对象)
+     * @param id
+     * @return boolean
+     */
+    public final static boolean updateById(String collectionName, Bson update, String id) {
+        return getMongoSupport().updateById(collectionName, update, id);
+    }
+
+    /**
+     * 删除一条记录
+     *
+     * @param collectionName 集合名称
+     * @param filter         删除条件,如: Filters.eq("key", "value")
+     * @return boolean
+     */
+    public final static boolean deleteOne(String collectionName, Bson filter) {
+        return getMongoSupport().deleteOne(collectionName, filter);
+    }
+
+    /**
+     * 删除所有记录
+     *
+     * @param collectionName 集合名称
+     * @param filter         删除条件,如: Filters.eq("key", "value")
+     * @return boolean
+     */
+    public final static long deleteAll(String collectionName, Bson filter) {
+        return getMongoSupport().deleteAll(collectionName, filter);
+    }
+
+    /**
+     * 通过_id删除记录
+     *
+     * @param collectionName 集合名称
+     * @param id
+     * @return boolean
+     */
+    public final static boolean deleteById(String collectionName, String id) {
+        return getMongoSupport().deleteById(collectionName, id);
+    }
+
+    /**
+     * 通过_id删除记录
+     *
+     * @param clazz java实体
+     * @param id
+     * @return boolean
+     */
+    public final static boolean deleteById(Class clazz, String id) {
+        return getMongoSupport().deleteById(getMongoSupport().getCollectionName(clazz), id);
+    }
+    /**
+     * 删除集合
+     *
+     * @param collectionName 集合名称
+     */
+    public final static void drop(String collectionName) {
+        getMongoSupport().drop(collectionName);
+    }
+
+    /**
+     * 创建索引
+     *
+     * @param collectionName 集合名称
+     * @param keys
+     */
+    public final static void createIndex(String collectionName, Bson keys) {
+        getMongoSupport().createIndex(collectionName, keys);
+    }
+
+    /**
+     * 删除所有索引
+     *
+     * @param collectionName 集合名称
+     */
+    public final static void dropIndexes(String collectionName) {
+        getMongoSupport().dropIndexes(collectionName);
+    }
+
+    /**
+     * 删除指定索引
+     *
+     * @param collectionName 集合名称
+     * @param indexName      索引名称
+     */
+    public final static void dropIndex(String collectionName, String indexName) {
+        getMongoSupport().dropIndex(collectionName, indexName);
+    }
+
+}

+ 122 - 0
src/main/java/io/emqx/exhook/MongoDB/README.md

@@ -0,0 +1,122 @@
+#### 介绍
+mongodb操作工具类封装库,减少mongodb操作难度,支持直接save或update java对象,支持批量对象操作,支持分页查询,支持自定义复杂的函数。
+
+#### 使用说明
+1. 直接使用MongoUtils中的静态方法;
+2. mongodb配置文件祥见MongoUtils.java中的mongoClient部分;
+3. java对象与mongodb中表的字段映射关系或顺序,可通过fastjson中的@JSONField注解进行配置;
+
+#### 测试代码
+1.TestEntity请自行编写测试,类名即collection
+2.如果collection不存在,save时自行创建
+3.long处理见MongoLongConverter.java说明
+
+```java
+    @org.junit.Test
+    public void testSave() {
+        //保存实体对象
+        TestEntity entity = new TestEntity();
+        entity.setName("sugar");
+        entity.setStatus(2);
+        entity.setTestScore(87f);
+        MongoUtils.save(entity);
+
+        //保存map对象,需要指定集合名称
+        Map<String, Object> map = new HashMap<>();
+        map.put("name", "zhangshan");
+        map.put("status", 2);
+        map.put("test_score", 82.5f);
+        map.put("onTime", new Date().getTime());
+        MongoUtils.save(map, "test");
+
+        //保存Document对象,与map类似,需要指定集合名称
+        Document doc = new Document();
+        doc.put("name", "lishi");
+        doc.put("status", 2);
+        doc.put("test_score", 99f);
+        doc.put("onTime", new Date().getTime());
+        MongoUtils.save(doc, "test");
+    }
+
+    @org.junit.Test
+    public void testUpdate() {
+        //更新java对象,必须要指定id,如果字段值为null,不会更新旧的数据
+        TestEntity entity = new TestEntity();
+        entity.setId("5c343804fdfad4230852e1f5");
+        entity.setName("sugar2");
+        entity.setStatus(1);
+        MongoUtils.update(entity);
+
+        //自定义更新的集合名、条件、字段
+        String collectionName = "test";
+        Bson filter = Filters.eq("_id", new ObjectId("5c343804fdfad4230852e1f6"));
+        Update update = new Update();
+        update.set("name", "zhangshan2");
+        update.inc("status", 1);//相当于status += 1
+        MongoUtils.update(collectionName, filter, update);
+    }
+
+    @org.junit.Test
+    public void testQuery() {
+        //查询出实体列表
+        List<TestEntity> ll = MongoUtils.findAll(TestEntity.class);
+        System.out.println(ll);
+
+        //查询Document对象列表,需要指定集合名
+        List<Document> list = MongoUtils.findAll("test");
+        System.out.println(list);
+
+        //用Filters生成条件查询,查询名字以2结尾的数据
+        List<TestEntity> ll2 = MongoUtils.find(TestEntity.class, Filters.regex("name", ".*2"));
+        System.out.println(ll2);
+
+        //分页查询,查询分数大于90的数据,查询第1页,每页10条
+        Page page = new Page(10, 1);
+        page.setClazz(TestEntity.class);//指定列表中的对象类型
+        page = MongoUtils.findPage(page, Filters.gt("test_score", 90));
+        System.out.println(page.getList());
+    }
+
+    @org.junit.Test
+    public void testDelete() {
+        //根据ID删除
+        MongoUtils.deleteById("test", "587482defdfad41a9c94c9b6");
+
+        //删除一条数据
+        MongoUtils.deleteOne("test", Filters.eq("_id", new ObjectId("587482defdfad41a9c94c9b6")));
+
+        //批量删除
+        List<ObjectId> del = new ArrayList<ObjectId>();
+        del.add(new ObjectId("587482defdfad41a9c94c9b6"));
+        del.add(new ObjectId("58748350fdfad41a1c5fba14"));
+        del.add(new ObjectId("5874930ffdfad40df031215a"));
+        MongoUtils.deleteAll("test", Filters.in("_id", del));
+    }
+
+    @org.junit.Test
+    public void testCount() {
+        //统计test表数据总数
+        long count = MongoUtils.count("test");
+
+        //统计test表中status=2的数据总数
+        long count2 = MongoUtils.count("test", Filters.eq("status", 2));
+
+        //根据status进行分组统计
+        List<Document> list = MongoUtils.count("test", new String[]{"status"});
+        System.out.println(list);
+
+        //自定义mapReduce函数进行数据分析,按天统计数据总数和status=1的总数
+        StringBuilder mapFunction = new StringBuilder("function(){emit(");
+        mapFunction.append("new Date(this.onTime).toLocaleDateString()");
+        mapFunction.append(",{count:1, send:this.status==1?1:0}");
+        mapFunction.append(");}");
+        StringBuilder reduceFunction = new StringBuilder("function(key, values){");
+        reduceFunction.append("var _total = 0, _send = 0;");
+        reduceFunction.append("values.forEach(function(val){_total += val.count; _send += val.send;});");
+        reduceFunction.append("return {count:_total, send:_send};");
+        reduceFunction.append("}");
+        List<Document> list2 = MongoUtils.mapReduce("test", mapFunction.toString(), reduceFunction.toString());
+        System.out.println(list2);
+    }
+```
+

+ 52 - 0
src/main/java/io/emqx/exhook/MongoDB/Update.java

@@ -0,0 +1,52 @@
+package io.emqx.exhook.MongoDB;
+
+import com.mongodb.BasicDBObject;
+
+/**
+ * @Description 更新对象
+ */
+public class Update {
+    private BasicDBObject incObjects = new BasicDBObject();
+    private BasicDBObject setObjects = new BasicDBObject();
+    private BasicDBObject unsetObjects = new BasicDBObject();
+
+    public Update() {
+    }
+
+    public Update(BasicDBObject set) {
+        this.setObjects = set;
+    }
+
+    public Update set(String key, Object value) {
+        setObjects.append(key, value);
+        return this;
+    }
+
+    public Update unset(String key) {
+        unsetObjects.append(key, 0);
+        return this;
+    }
+
+    public Update inc(String key, int value) {
+        incObjects.append(key, value);
+        return this;
+    }
+
+    public BasicDBObject set() {
+        return setObjects;
+    }
+
+    public BasicDBObject unset() {
+        return unsetObjects;
+    }
+
+    public BasicDBObject inc() {
+        return incObjects;
+    }
+
+    public void reset() {
+        setObjects.clear();
+        incObjects.clear();
+        unsetObjects.clear();
+    }
+}

+ 10 - 0
src/main/java/io/emqx/exhook/MongoDB/paging/IPage.java

@@ -0,0 +1,10 @@
+package io.emqx.exhook.MongoDB.paging;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface IPage extends Serializable {
+    void setTotalCount(long totalCount);
+    <T> void setList(List<T> dataList);
+    <T> Class<T> getClazz();
+}

+ 191 - 0
src/main/java/io/emqx/exhook/MongoDB/paging/Page.java

@@ -0,0 +1,191 @@
+package io.emqx.exhook.MongoDB.paging;
+
+import java.util.List;
+
+/**
+ * 分页对象
+ *
+ * @Author Sugar
+ * @Version 2017年3月30日 下午8:08:58
+ */
+public class Page implements IPage {
+    private static final long serialVersionUID = 1L;
+
+    private long totalCount;// 总记录数
+    private int pageCount;// 总页数
+    private int pageSize;// 每页记录数
+    private int pageNo;// 当前页页码,从1开始
+    private int prevPageNo;// 上一页页码,为0时无上一页
+    private int nextPageNo;// 下一页页码,为0时无下一页
+    private List<?> list;// 本页数据集
+    private String orderBy;
+    private Class clazz;
+    private long maxSize;//最大显示记录数
+    private int endPageNo;//最后一页页码
+
+    public Page() {
+        pageNo = 1;
+        pageSize = 20;
+    }
+
+    public Page(int pageSize, int pageNo) {
+        this.pageSize = pageSize;
+        this.pageNo = (pageNo < 1 ? 1 : pageNo);
+    }
+
+
+    /**
+     * @param totalCount
+     *            数据总记录数
+     * @param pageSize
+     *            每页显示多少条数据
+     * @param pageNo
+     *            当前第几页,从1开始
+     * @param list 当前数据列表
+     */
+    public <T> Page(long totalCount, int pageSize, int pageNo, List<T> list) {
+        this.totalCount = totalCount;
+        this.pageSize = pageSize;
+        this.pageNo = (pageNo < 1 ? 1 : pageNo);
+        this.list = list;
+        init();
+    }
+
+    /**
+     *
+     * @param pageSize 每页显示多少条数据
+     * @param pageNo 当前第几页,从1开始
+     * @param dataList dataList 所有数据列表
+     */
+    public <T> Page(int pageSize, int pageNo, List<T> dataList) {
+        if (dataList == null) {
+            return;
+        }
+        this.totalCount = dataList.size();
+        this.pageSize = pageSize;
+        this.pageNo = (pageNo < 1 ? 1 : pageNo);
+        int start = pageSize * (this.pageNo - 1);
+        int end = start + pageSize;
+        if (end <= totalCount) {
+            this.list = dataList.subList(start, end);
+        } else if (start < totalCount) {
+            this.list = dataList.subList(start, (int) totalCount);
+        }
+        init();
+    }
+
+    public long getTotalCount() {
+        return totalCount;
+    }
+
+    public void setTotalCount(long totalCount) {
+        this.totalCount = totalCount;
+        init();
+    }
+
+    public int getPageCount() {
+        return pageCount;
+    }
+
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    public void setPageSize(int pageSize) {
+        if (this.pageSize != pageSize) {
+            this.pageSize = pageSize;
+            init();
+        }
+    }
+
+    public int getPageNo() {
+        return pageNo;
+    }
+
+    public void setPageNo(int pageNo) {
+        this.pageNo = pageNo;
+    }
+
+    /**
+     * @return the orderBy
+     */
+    public String getOrderBy() {
+        return orderBy;
+    }
+
+    /**
+     * @param orderBy the orderBy to set
+     */
+    public void setOrderBy(String orderBy) {
+        this.orderBy = orderBy;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> getList() {
+        return (List<T>) list;
+    }
+
+    public <T> void setList(List<T> list) {
+        this.list = list;
+    }
+
+    public int getPrevPageNo() {
+        return prevPageNo;
+    }
+
+    public void setPrevPageNo(int prevPageNo) {
+        this.prevPageNo = prevPageNo;
+    }
+
+    public int getNextPageNo() {
+        return nextPageNo;
+    }
+
+    public void setNextPageNo(int nextPageNo) {
+        this.nextPageNo = nextPageNo;
+    }
+
+    public void init() {
+        if (totalCount % this.pageSize == 0) {
+            this.pageCount = (int) (totalCount / this.pageSize);
+        } else {
+            this.pageCount = (int) (totalCount / this.pageSize + 1);
+        }
+        this.endPageNo = this.pageCount;
+        if (this.maxSize > 0 && this.maxSize < this.totalCount) {//如果限制了最大记录数,最大页数只能以最大记录数计算
+            this.endPageNo = (int)(this.maxSize / this.pageSize);
+        }
+        if (this.pageNo < 1) {
+            this.pageNo = 1;
+        } else if (this.endPageNo > 0 && this.pageNo > this.endPageNo) {
+            this.pageNo = this.endPageNo;
+        }
+        this.prevPageNo = this.pageNo - 1;
+        if (this.pageNo == this.endPageNo) {
+            this.nextPageNo = 0;
+        } else {
+            this.nextPageNo = this.pageNo + 1;
+        }
+    }
+
+    public <T> Class<T> getClazz() {
+        return clazz;
+    }
+
+    public void setClazz(Class clazz) {
+        this.clazz = clazz;
+    }
+
+    public void setMaxSize(long maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    public void reset() {
+        this.pageNo = 1;
+        this.prevPageNo = 0;
+        this.nextPageNo = 0;
+        this.pageCount = 0;
+        this.list = null;
+        this.totalCount = 0;
+    }
+}

+ 22 - 0
src/main/java/io/emqx/exhook/MongoDB/paging/PageException.java

@@ -0,0 +1,22 @@
+package io.emqx.exhook.MongoDB.paging;
+
+/**
+ * 分页异常
+ */
+public class PageException extends RuntimeException {
+    public PageException() {
+        super();
+    }
+
+    public PageException(String message) {
+        super(message);
+    }
+
+    public PageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PageException(Throwable cause) {
+        super(cause);
+    }
+}

+ 50 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/BeanField.java

@@ -0,0 +1,50 @@
+package io.emqx.exhook.MongoDB.reflect;
+
+import lombok.Getter;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+@Getter
+public class BeanField implements IFieldWrapper {
+    private String name;
+    private Class<?> type;
+    public Field field;
+    public Method setter;
+    public Method getter;
+
+    public BeanField(Field field, Method setter, Method getter) {
+        this.field = field;
+        this.setter = setter;
+        this.getter = getter;
+        this.name = field.getName();
+        this.type = field.getType();
+    }
+
+    public <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
+        return field.getAnnotation(annotationClass);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        BeanField beanField = (BeanField) o;
+        return name.equals(beanField.getName()) && type == beanField.getType();
+    }
+
+    public Method getGetterMethod() {
+        return getter;
+    }
+
+    public Method getSetterMethod() {
+        return setter;
+    }
+
+    @Override
+    public int hashCode() {
+        return field.hashCode();
+    }
+}

+ 7 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/IFieldConverter.java

@@ -0,0 +1,7 @@
+package io.emqx.exhook.MongoDB.reflect;
+
+import java.lang.reflect.Field;
+
+public interface IFieldConverter<T> {
+    T convert(Class clazz, Field field);
+}

+ 11 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/IFieldFilter.java

@@ -0,0 +1,11 @@
+package io.emqx.exhook.MongoDB.reflect;
+
+import java.lang.reflect.Field;
+
+public interface IFieldFilter {
+
+    String name();
+
+    boolean filter(Field field);
+
+}

+ 9 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/IFieldWrapper.java

@@ -0,0 +1,9 @@
+package io.emqx.exhook.MongoDB.reflect;
+
+import java.lang.reflect.Field;
+
+public interface IFieldWrapper {
+
+    Field getField();
+
+}

+ 628 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/ReflectUtils.java

@@ -0,0 +1,628 @@
+package io.emqx.exhook.MongoDB.reflect;
+
+import io.emqx.exhook.MongoDB.reflect.impl.BeanFieldConverter;
+import java.lang.reflect.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * java bean反射工具类
+ *
+ * @Author Sugar
+ * @Version 2018/6/1 10:48
+ */
+public class ReflectUtils {
+    protected static final Map<String, List<? extends IFieldWrapper>> BEAN_FIELD_CACHE = new HashMap<>();//属性缓存
+    protected static final Map<String, List<Field>> FIELD_CACHE = new HashMap<>();//属性缓存
+    protected static final Map<String, List<Method>> SETTER_CACHE = new HashMap<>();//setter方法缓存
+    protected static final Map<String, List<Method>> GETTER_CACHE = new HashMap<>();//getter方法缓存
+    private static final IFieldConverter<BeanField> beanFieldConverter =  new BeanFieldConverter();
+
+    /**
+     * 获取类中非final和static的属性(包括父类中的属性),带setter和getter方法
+     *
+     * @param clazz
+     * @return
+     */
+    public static final List<BeanField> getBeanFields(Class clazz) {
+        return getBeanFields(clazz, true, null, beanFieldConverter);
+    }
+
+    /**
+     * 获取类中非final和static的属性(包括父类中的属性),带setter和getter方法
+     *
+     * @param clazz
+     * @param filter
+     * @return
+     */
+    public static final List<BeanField> getBeanFields(Class clazz, IFieldFilter filter) {
+        return getBeanFields(clazz, true, filter, beanFieldConverter);
+    }
+
+    /**
+     * 获取类中非final和static的属性,带setter和getter方法
+     *
+     * @param clazz
+     * @param superClass 是否包含父类中的属性
+     * @return
+     */
+    public static final List<BeanField> getBeanFields(Class clazz, boolean superClass) {
+        return getBeanFields(clazz, superClass, null, beanFieldConverter);
+    }
+
+    /**
+     * 获取类中非final和static的属性,带setter和getter方法
+     *
+     * @param clazz
+     * @param superClass 是否包含父类中的属性
+     * @param filter
+     * @return
+     */
+    public static final <T extends IFieldWrapper> List<T> getBeanFields(Class clazz, boolean superClass, IFieldFilter filter, IFieldConverter<T> converter) {
+        String className = clazz.getName();
+        String cacheKey = className + ":" + superClass + (filter == null ? "" : ":" + filter.name());
+        List fields = BEAN_FIELD_CACHE.get(cacheKey);
+        if (fields != null) {
+            return fields;
+        }
+        synchronized (clazz) {
+            fields = BEAN_FIELD_CACHE.get(cacheKey);
+            if (fields != null) {
+                return fields;
+            }
+
+            fields = new ArrayList<>();
+            Field[] declared = clazz.getDeclaredFields();
+            for (Field field : declared) {
+                int mod = field.getModifiers();
+                if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) {
+                    continue;
+                }
+                if (filter != null && !filter.filter(field)) {
+                    continue;
+                }
+                fields.add(converter.convert(clazz, field));
+            }
+            if (superClass) {//包含父类中的属性
+                Class<?> superClazz = clazz;
+                while ((superClazz = superClazz.getSuperclass()) != null) {
+                    Field[] lst = superClazz.getDeclaredFields();
+                    for (Field field : lst) {
+                        int mod = field.getModifiers();
+                        if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) {
+                            continue;
+                        }
+                        if (filter != null && !filter.filter(field)) {
+                            continue;
+                        }
+                        boolean exists = false;
+                        for (Object o : fields) {
+                            IFieldWrapper f = (IFieldWrapper) o;
+                            if (f.getField().getName().equals(field.getName()) && f.getField().getType() == field.getType()) {
+                                exists = true;
+                                break;
+                            }
+                        }
+                        if (!exists) {
+                            fields.add(converter.convert(clazz, field));
+                        }
+                    }
+                }
+            }
+            BEAN_FIELD_CACHE.put(cacheKey, fields);
+        }
+        return fields;
+    }
+
+    /**
+     * 获取类或所有父类中非final和static的属性
+     *
+     * @param clazz
+     * @return
+     */
+    public static final List<Field> getFields(Class clazz) {
+        return getFields(clazz, true, null);
+    }
+
+    /**
+     * 获取类或所有父类中非final和static的属性
+     *
+     * @param clazz
+     * @param filter
+     * @return
+     */
+    public static final List<Field> getFields(Class clazz, IFieldFilter filter) {
+        return getFields(clazz, true, filter);
+    }
+
+    /**
+     * 获取类或所有父类中非final和static的属性
+     *
+     * @param clazz
+     * @param superClass
+     * @return
+     */
+    public static final List<Field> getFields(Class clazz, boolean superClass) {
+        return getFields(clazz, superClass, null);
+    }
+
+    /**
+     * 获取类中非final和static的属性
+     *
+     * @param clazz
+     * @param superClass 是否包含父类中的属性
+     * @param filter
+     * @return
+     */
+    public static final List<Field> getFields(Class clazz, boolean superClass, IFieldFilter filter) {
+        String className = clazz.getName();
+        String cacheKey = className + ":" + superClass + (filter == null ? "" : ":" + filter.name());
+        List<Field> fields = FIELD_CACHE.get(cacheKey);
+        if (fields != null) {
+            return fields;
+        }
+        synchronized (clazz) {
+            fields = FIELD_CACHE.get(cacheKey);
+            if (fields != null) {
+                return fields;
+            }
+            fields = new ArrayList<>();
+            Field[] declared = clazz.getDeclaredFields();
+            for (Field field : declared) {
+                int mod = field.getModifiers();
+                if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) {
+                    continue;
+                }
+                if (filter != null && !filter.filter(field)) {
+                    continue;
+                }
+                fields.add(field);
+            }
+            if (superClass) {//包含父类中的属性
+                Class<?> superClazz = clazz;
+                while ((superClazz = superClazz.getSuperclass()) != null) {
+                    Field[] lst = superClazz.getDeclaredFields();
+                    for (Field field : lst) {
+                        int mod = field.getModifiers();
+                        if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) {
+                            continue;
+                        }
+                        if (filter != null && !filter.filter(field)) {
+                            continue;
+                        }
+                        if (!contains(fields, field)) {
+                            fields.add(field);
+                        }
+                    }
+                }
+            }
+            FIELD_CACHE.put(cacheKey, fields);
+        }
+        return fields;
+    }
+
+    /**
+     * 获取类或所有父类中setter方法
+     *
+     * @param clazz
+     * @return
+     */
+    public static final List<Method> getSetterMethods(Class clazz) {
+        return getSetterMethods(clazz, true);
+    }
+
+    /**
+     * 获取类中setter方法
+     *
+     * @param clazz
+     * @param superClass 是否包含父类中的方法
+     * @return
+     */
+    public static final List<Method> getSetterMethods(Class clazz, boolean superClass) {
+        String className = clazz.getName();
+        List<Method> methods = SETTER_CACHE.get(className + ":" + superClass);
+        if (methods != null) {
+            return methods;
+        }
+        methods = new ArrayList<>();
+        Method[] declared = clazz.getDeclaredMethods();
+        for (Method method : declared) {
+            if (!method.getName().matches("set[A-Z]\\w+") || method.getParameterCount() != 1) {
+                continue;
+            }
+            int mod = method.getModifiers();
+            if (Modifier.isStatic(mod)) {
+                continue;
+            }
+            methods.add(method);
+        }
+        if (superClass) {//包含父类中的方法
+            Class<?> superClazz = clazz;
+            while ((superClazz = superClazz.getSuperclass()) != null) {
+                Method[] lst = superClazz.getDeclaredMethods();
+                for (Method method : lst) {
+                    if (!method.getName().matches("set[A-Z]\\w+") || method.getParameterCount() != 1) {
+                        continue;
+                    }
+                    int mod = method.getModifiers();
+                    if (Modifier.isStatic(mod)) {
+                        continue;
+                    }
+                    if (!contains(methods, method)) {
+                        methods.add(method);
+                    }
+                }
+            }
+        }
+        SETTER_CACHE.put(className + ":" + superClass, methods);
+        return methods;
+    }
+
+    /**
+     * 获取类或所有父类中getter方法
+     *
+     * @param clazz
+     * @return
+     */
+    public static final List<Method> getGetterMethods(Class clazz) {
+        return getGetterMethods(clazz, true);
+    }
+
+    /**
+     * 获取类中getter方法
+     *
+     * @param clazz
+     * @param superClass 是否包含父类中的方法
+     * @return
+     */
+    public static final List<Method> getGetterMethods(Class clazz, boolean superClass) {
+        String className = clazz.getName();
+        List<Method> methods = GETTER_CACHE.get(className + ":" + superClass);
+        if (methods != null) {
+            return methods;
+        }
+        methods = new ArrayList<>();
+        Method[] declared = clazz.getDeclaredMethods();
+        for (Method method : declared) {
+            if (!method.getName().matches("(get|is)[A-Z]\\w+") || method.getParameterCount() != 0) {
+                continue;
+            }
+            int mod = method.getModifiers();
+            if (Modifier.isStatic(mod)) {
+                continue;
+            }
+            methods.add(method);
+        }
+        if (superClass) {//包含父类中的方法
+            Class<?> superClazz = clazz;
+            while ((superClazz = superClazz.getSuperclass()) != null) {
+                Method[] lst = superClazz.getDeclaredMethods();
+                for (Method method : lst) {
+                    if (!method.getName().matches("(get|is)[A-Z]\\w+") || method.getParameterCount() != 0) {
+                        continue;
+                    }
+                    int mod = method.getModifiers();
+                    if (Modifier.isStatic(mod)) {
+                        continue;
+                    }
+                    if (!contains(methods, method)) {
+                        methods.add(method);
+                    }
+                }
+            }
+        }
+        GETTER_CACHE.put(className + ":" + superClass, methods);
+        return methods;
+    }
+
+    /**
+     * 根据字段获取getter方法名
+     *
+     * @param fieldName
+     * @return
+     */
+    public static final String getGetterMethodName(String fieldName) {
+        return "get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+    }
+
+    /**
+     * 根据字段获取getter方法名
+     *
+     * @param field
+     * @return
+     */
+    public static final String getGetterMethodName(Field field) {
+        String fieldName = field.getName();
+        if (field.getType() == boolean.class || field.getType().isAssignableFrom(Boolean.class)) {
+            return "is" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+        }
+        return getGetterMethodName(field.getName());
+    }
+
+    /**
+     * 根据字段获取setter方法名
+     *
+     * @param fieldName
+     * @return
+     */
+    public static final String getSetterMethodName(String fieldName) {
+        return "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1);
+    }
+
+    /**
+     * 根据字段获取setter方法名
+     *
+     * @param field
+     * @return
+     */
+    public static final String getSetterMethodName(Field field) {
+        return getSetterMethodName(field.getName());
+    }
+
+    /**
+     * 根据setter或getter方法获取字段名
+     *
+     * @param method
+     * @return
+     */
+    public static final String getFieldName(Method method) {
+        String name = method.getName().replaceFirst("^(get|set|is)", "");
+        if (name.isEmpty()) {
+            return null;
+        }
+        return Character.toLowerCase(name.charAt(0)) + name.substring(1);
+    }
+
+    public static final Method getMethod(Class clazz, String method) {
+        try {
+            return clazz.getMethod(method);
+        } catch (NoSuchMethodException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    /**
+     * 获取setter方法
+     *
+     * @param clazz
+     * @param fieldName
+     * @return
+     */
+    public static final Method getSetterMethod(Class clazz, String fieldName) {
+        List<Method> methods = getSetterMethods(clazz);
+        if (methods == null || methods.isEmpty()) {
+            return null;
+        }
+        String name = getSetterMethodName(fieldName);
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 获取setter方法
+     *
+     * @param clazz
+     * @param field
+     * @return
+     */
+    public static final Method getSetterMethod(Class clazz, Field field) {
+        return getSetterMethod(clazz, field.getName());
+    }
+
+    /**
+     * 获取getter方法
+     *
+     * @param clazz
+     * @param fieldName
+     * @return
+     */
+    public static final Method getGetterMethod(Class clazz, String fieldName) {
+        List<Method> methods = getGetterMethods(clazz);
+        if (methods == null || methods.isEmpty()) {
+            return null;
+        }
+        String name = getGetterMethodName(fieldName);
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 获取getter方法
+     *
+     * @param clazz
+     * @param field
+     * @return
+     */
+    public static final Method getGetterMethod(Class clazz, Field field) {
+        return getGetterMethod(clazz, field.getName());
+    }
+
+    /**
+     * 通过setter或getter方法获取属性
+     *
+     * @param clazz
+     * @param method
+     * @return
+     */
+    public static final Field getField(Class clazz, Method method) {
+        List<Field> fields = getFields(clazz);
+        if (fields == null || fields.isEmpty()) {
+            return null;
+        }
+        String name = getFieldName(method);
+        if (name == null) {
+            return null;
+        }
+        for (Field field : fields) {
+            if (field.getName().equals(name)) {
+                return field;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 判断是否包含相同的field
+     *
+     * @param list
+     * @param field
+     * @return
+     */
+    public static final boolean contains(List<Field> list, Field field) {
+        if (list == null || list.isEmpty()) {
+            return false;
+        }
+        for (Field f : list) {
+            if (f.getName().equals(field.getName()) && f.getType() == field.getType()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * 判断是否包含相同的method
+     *
+     * @param list
+     * @param method
+     * @return
+     */
+    protected static final boolean contains(List<Method> list, Method method) {
+        if (list == null || list.isEmpty()) {
+            return false;
+        }
+        for (Method m : list) {
+            if (!m.getName().equals(method.getName())) {
+                continue;
+            }
+            TypeVariable[] var = m.getTypeParameters();
+            TypeVariable[] var2 = method.getTypeParameters();
+            if (var == null && var2 == null) {
+                return true;
+            }
+            //判断参数个数是否相同
+            if (var.length != var2.length) {
+                continue;
+            }
+            //判断参数类型是否相同
+            boolean flag = true;
+            for (int i = 0; i < var.length; i++) {
+                if (!var[i].getTypeName().equals(var2[i].getTypeName())) {
+                    flag = false;
+                    break;
+                }
+            }
+            if (flag) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * 获取泛型类型
+     *
+     * @param field
+     * @return
+     */
+    public static Class getGenericType(Field field) {
+        Type type = field.getGenericType();
+        Class<?> genericClazz = Map.class;
+        if (type instanceof ParameterizedType) { // 如果是泛型参数的类型
+            ParameterizedType paramType = (ParameterizedType) type;
+            type = paramType.getActualTypeArguments()[0];
+            if (type instanceof Class<?>) {
+                genericClazz = (Class<?>) type; // 得到泛型里的class类型对象
+            }
+        }
+        return genericClazz;
+    }
+
+    /**
+     * 直接读取对象属性值, 无视private/protected修饰符, 不经过getter方法.
+     *
+     * @param obj
+     * @param fieldName
+     * @return
+     */
+    public static Object getFieldValue(final Object obj, final String fieldName) {
+        List<Field> fields = getFields(obj.getClass());
+        if (fields != null && !fields.isEmpty()) {
+            for (Field field : fields) {
+                if (field.getName().equals(fieldName)) {
+                    if (!field.isAccessible()) {
+                        field.setAccessible(true);
+                    }
+                    Object value = null;
+                    try {
+                        value = field.get(obj);
+                    } catch (IllegalAccessException e) {
+                        e.printStackTrace();
+                    }
+                    return value;
+                }
+            }
+        }
+        throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + obj + "]");
+    }
+
+    /**
+     * 直接设置对象属性值, 无视private/protected修饰符, 不经过setter方法.
+     *
+     * @param instance
+     * @param fieldName
+     * @return
+     */
+    public static void setFieldValue(Object instance, String fieldName, Object value) {
+        List<Field> fields = getFields(instance.getClass());
+        if (fields != null && !fields.isEmpty()) {
+            for (Field field : fields) {
+                if (field.getName().equals(fieldName)) {
+                    if (!field.isAccessible()) {
+                        field.setAccessible(true);
+                    }
+                    try {
+                        field.set(instance, value);
+                    } catch (IllegalAccessException e) {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+            }
+        }
+        throw new IllegalArgumentException("Could not find field [" + fieldName + "] on target [" + instance + "]");
+    }
+
+    /**
+     * 直接设置对象属性值, 无视private/protected修饰符, 不经过setter方法.
+     *
+     * @param instance
+     * @param field
+     * @return
+     */
+    public static void setFieldValue(Object instance, Field field, Object value) {
+        if (!field.isAccessible()) {
+            field.setAccessible(true);
+        }
+        try {
+            field.set(instance, value);
+        } catch (IllegalAccessException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static boolean isVoidReturnType(Method method) {
+        return method.getReturnType() == Void.TYPE;
+    }
+}

+ 48 - 0
src/main/java/io/emqx/exhook/MongoDB/reflect/impl/BeanFieldConverter.java

@@ -0,0 +1,48 @@
+package io.emqx.exhook.MongoDB.reflect.impl;
+
+import io.emqx.exhook.MongoDB.reflect.*;
+import static io.emqx.exhook.MongoDB.reflect.ReflectUtils.*;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+
+/**
+ * @Description Field生成带setter、getter方法的BeanField
+ * @Author Sugar
+ * @Date 2020/10/9 11:53
+ */
+public class BeanFieldConverter implements IFieldConverter<BeanField> {
+    @Override
+    public BeanField convert(Class clazz, Field field) {
+        Method setter = null;
+        String setterName = getSetterMethodName(field);
+        try {
+            setter = clazz.getMethod(setterName, field.getType());
+        } catch (NoSuchMethodException e) {
+            //如果没有参数为该属性的数据类型的set方法,则再找名字相同的其他set方法,以兼容重写了set方法的情况
+            setter = getSetterMethod(clazz, field);
+            if (setter == null) {
+                System.out.println("!No setter method: {"+field.getName()+"}");
+            }
+        }
+        Method getter = null;
+        String getterName = getGetterMethodName(field);
+        try {
+            getter = clazz.getMethod(getterName);
+        } catch (NoSuchMethodException e) {
+            //如果boolean类型属性无对应的isXxx()方法,则再找对应的getXxx()方法
+            if (getterName.startsWith("is")) {
+                getterName = "get" + getterName.substring(2);
+                try {
+                    getter = clazz.getMethod(getterName);
+                } catch (NoSuchMethodException ex) {
+                    System.out.println("!No getter method: {"+field.getName()+"}");
+                    ex.printStackTrace();
+                }
+            } else {
+                System.out.println("!No getter method: {"+field.getName()+"}");
+            }
+        }
+        return new BeanField(field, setter, getter);
+    }
+}

+ 22 - 0
src/main/java/io/emqx/exhook/utils/Sm4Util.java

@@ -0,0 +1,22 @@
+package io.emqx.exhook.utils;
+
+
+import cn.hutool.core.util.RandomUtil;
+import cn.hutool.crypto.SmUtil;
+import cn.hutool.crypto.symmetric.SM4;
+
+import java.util.Arrays;
+
+
+public class Sm4Util {
+
+    public static void main(String[] args){
+        String key = RandomUtil.randomString(16);
+        SM4 sm4 = SmUtil.sm4(key.getBytes());
+        String text = sm4.encryptHex("dddddddddddddd".getBytes());
+        System.err.println(text);
+        System.err.println(sm4.decryptStr(text));
+    }
+
+
+}

+ 499 - 0
src/main/proto/exhook.proto

@@ -0,0 +1,499 @@
+//------------------------------------------------------------------------------
+// Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//------------------------------------------------------------------------------
+
+syntax = "proto3";
+
+option csharp_namespace = "Emqx.Exhook.V2";
+option go_package = "emqx.io/grpc/exhook";
+option java_multiple_files = true;
+option java_package = "io.emqx.exhook";
+option java_outer_classname = "EmqxExHookProto";
+
+// The exhook proto version should be fixed as `v2` in EMQX v5.x
+// to make sure the exhook proto version is compatible
+package emqx.exhook.v2;
+
+service HookProvider {
+
+  rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+  rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+  rpc OnClientAuthorize(ClientAuthorizeRequest) returns (ValuedResponse) {};
+
+  rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTakenover(SessionTakenoverRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+  rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+
+//------------------------------------------------------------------------------
+// Request
+//------------------------------------------------------------------------------
+
+message ProviderLoadedRequest {
+
+  BrokerInfo broker = 1;
+
+  RequestMeta meta = 2;
+}
+
+message ProviderUnloadedRequest {
+
+  RequestMeta meta = 1;
+}
+
+message ClientConnectRequest {
+
+  ConnInfo conninfo = 1;
+
+  // MQTT CONNECT packet's properties (MQTT v5.0)
+  //
+  // It should be empty on MQTT v3.1.1/v3.1 or others protocol
+  repeated Property props = 2;
+
+  RequestMeta meta = 3;
+}
+
+message ClientConnackRequest {
+
+  ConnInfo conninfo = 1;
+
+  string result_code = 2;
+
+  repeated Property props = 3;
+
+  RequestMeta meta = 4;
+}
+
+message ClientConnectedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  RequestMeta meta = 2;
+}
+
+message ClientDisconnectedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string reason = 2;
+
+  RequestMeta meta = 3;
+}
+
+message ClientAuthenticateRequest {
+
+  ClientInfo clientinfo = 1;
+
+  bool result = 2;
+
+  RequestMeta meta = 3;
+}
+
+message ClientAuthorizeRequest {
+
+  ClientInfo clientinfo = 1;
+
+  enum AuthorizeReqType {
+
+    PUBLISH = 0;
+
+    SUBSCRIBE = 1;
+  }
+
+  AuthorizeReqType type = 2;
+
+  string topic = 3;
+
+  bool result = 4;
+
+  RequestMeta meta = 5;
+}
+
+message ClientSubscribeRequest {
+
+  ClientInfo clientinfo = 1;
+
+  repeated Property props = 2;
+
+  repeated TopicFilter topic_filters = 3;
+
+  RequestMeta meta = 4;
+}
+
+message ClientUnsubscribeRequest {
+
+  ClientInfo clientinfo = 1;
+
+  repeated Property props = 2;
+
+  repeated TopicFilter topic_filters = 3;
+
+  RequestMeta meta = 4;
+}
+
+message SessionCreatedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  RequestMeta meta = 2;
+}
+
+message SessionSubscribedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string topic = 2;
+
+  SubOpts subopts = 3;
+
+  RequestMeta meta = 4;
+}
+
+message SessionUnsubscribedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string topic = 2;
+
+  RequestMeta meta = 3;
+}
+
+message SessionResumedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  RequestMeta meta = 2;
+}
+
+message SessionDiscardedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  RequestMeta meta = 2;
+}
+
+message SessionTakenoverRequest {
+
+  ClientInfo clientinfo = 1;
+
+  RequestMeta meta = 2;
+}
+
+message SessionTerminatedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string reason = 2;
+
+  RequestMeta meta = 3;
+}
+
+message MessagePublishRequest {
+
+  Message message = 1;
+
+  RequestMeta meta = 2;
+}
+
+message MessageDeliveredRequest {
+
+  ClientInfo clientinfo = 1;
+
+  Message message = 2;
+
+  RequestMeta meta = 3;
+}
+
+message MessageDroppedRequest {
+
+  Message message = 1;
+
+  string reason = 2;
+
+  RequestMeta meta = 3;
+}
+
+message MessageAckedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  Message message = 2;
+
+  RequestMeta meta = 3;
+}
+
+//------------------------------------------------------------------------------
+// Response
+//------------------------------------------------------------------------------
+
+// Responsed by `ProviderLoadedRequest`
+
+message LoadedResponse {
+
+  repeated HookSpec hooks = 1;
+}
+
+// Responsed by `ClientAuthenticateRequest` `ClientAuthorizeRequest` `MessagePublishRequest`
+
+message ValuedResponse {
+
+  // The responded value type
+  //  - contiune: Use the responded value and execute the next hook
+  //  - ignore: Ignore the responded value
+  //  - stop_and_return: Use the responded value and stop the chain executing
+  enum ResponsedType {
+
+    CONTINUE = 0;
+
+    IGNORE = 1;
+
+    STOP_AND_RETURN = 2;
+  }
+
+  ResponsedType type = 1;
+
+  oneof value {
+
+    // Boolean result, used on the 'client.authenticate', 'client.authorize' hooks
+    bool bool_result = 3;
+
+    // Message result, used on the 'message.*' hooks
+    Message message = 4;
+  }
+}
+
+// no Response by other Requests
+
+message EmptySuccess { }
+
+//------------------------------------------------------------------------------
+// Basic data types
+//------------------------------------------------------------------------------
+
+message BrokerInfo {
+
+  string version = 1;
+
+  string sysdescr = 2;
+
+  int64 uptime = 3;
+
+  string datetime = 4;
+}
+
+
+message HookSpec {
+
+  // The registered hooks name
+  //
+  // Available value:
+  //   "client.connect",      "client.connack"
+  //   "client.connected",    "client.disconnected"
+  //   "client.authenticate", "client.authorize"
+  //   "client.subscribe",    "client.unsubscribe"
+  //
+  //   "session.created",      "session.subscribed"
+  //   "session.unsubscribed", "session.resumed"
+  //   "session.discarded",    "session.takenover"
+  //   "session.terminated"
+  //
+  //   "message.publish", "message.delivered"
+  //   "message.acked",   "message.dropped"
+  string name = 1;
+
+  // The topic filters for message hooks
+  repeated string topics = 2;
+}
+
+message ConnInfo {
+
+  string node = 1;
+
+  string clientid = 2;
+
+  string username = 3;
+
+  string peerhost = 4;
+
+  uint32 sockport = 5;
+
+  string proto_name = 6;
+
+  string proto_ver = 7;
+
+  uint32 keepalive = 8;
+}
+
+message ClientInfo {
+
+  string node = 1;
+
+  string clientid = 2;
+
+  string username = 3;
+
+  string password = 4;
+
+  string peerhost = 5;
+
+  uint32 sockport = 6;
+
+  string protocol = 7;
+
+  string mountpoint = 8;
+
+  bool  is_superuser = 9;
+
+  bool  anonymous = 10;
+
+  // common name of client TLS cert
+  string cn = 11;
+
+  // subject of client TLS cert
+  string dn = 12;
+}
+
+message Message {
+
+  string node = 1;
+
+  string id = 2;
+
+  uint32 qos = 3;
+
+  string from = 4;
+
+  string topic = 5;
+
+  bytes  payload = 6;
+
+  uint64 timestamp = 7;
+
+  // The key of header can be:
+  //  - username:
+  //    * Readonly
+  //    * The username of sender client
+  //    * Value type: utf8 string
+  //  - protocol:
+  //    * Readonly
+  //    * The protocol name of sender client
+  //    * Value type: string enum with "mqtt", "mqtt-sn", ...
+  //  - peerhost:
+  //    * Readonly
+  //    * The peerhost of sender client
+  //    * Value type: ip address string
+  //  - allow_publish:
+  //    * Writable
+  //    * Whether to allow the message to be published by emqx
+  //    * Value type: string enum with "true", "false", default is "true"
+  //
+  // Notes: All header may be missing, which means that the message does not
+  //   carry these headers. We can guarantee that clients coming from MQTT,
+  //   MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
+  //   carry these headers, but there is no guarantee that messages published
+  //   by other means will do, e.g. messages published by HTTP-API
+  map<string, string> headers = 8;
+}
+
+message Property {
+
+  string name = 1;
+
+  string value = 2;
+}
+
+message TopicFilter {
+
+  string name = 1;
+
+  uint32 qos = 2;
+}
+
+message SubOpts {
+
+  // The QoS level
+  uint32 qos = 1;
+
+  // The group name for shared subscription
+  string share = 2;
+
+  // The Retain Handling option (MQTT v5.0)
+  //
+  //  0 = Send retained messages at the time of the subscribe
+  //  1 = Send retained messages at subscribe only if the subscription does
+  //       not currently exist
+  //  2 = Do not send retained messages at the time of the subscribe
+  uint32 rh = 3;
+
+  // The Retain as Published option (MQTT v5.0)
+  //
+  //  If 1, Application Messages forwarded using this subscription keep the
+  //        RETAIN flag they were published with.
+  //  If 0, Application Messages forwarded using this subscription have the
+  //        RETAIN flag set to 0.
+  // Retained messages sent when the subscription is established have the RETAIN flag set to 1.
+  uint32 rap = 4;
+
+  // The No Local option (MQTT v5.0)
+  //
+  // If the value is 1, Application Messages MUST NOT be forwarded to a
+  // connection with a ClientID equal to the ClientID of the publishing
+  uint32 nl = 5;
+}
+
+message RequestMeta {
+
+  string node = 1;
+
+  string version = 2;
+
+  string sysdescr = 3;
+
+  string cluster_name = 4;
+}