This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f16d838 [ISSUE #726] Add test module to provide integration test for 
java client (#727)
8f16d838 is described below

commit 8f16d838bafcb7d3d4566cc446753e8e9582b8a8
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Tue Apr 16 14:20:16 2024 +0800

    [ISSUE #726] Add test module to provide integration test for java client 
(#727)
    
    * Add GrpcServerIntegrationTest
    
    * Add AttemptIdIntegrationTest
    
    * fix jdk17 SelfSignedCertificate
---
 java/pom.xml                                       |  15 ++
 java/test/pom.xml                                  |  71 +++++++
 .../test/client/AttemptIdIntegrationTest.java      | 224 +++++++++++++++++++++
 .../rocketmq/test/helper/ResponseWriter.java       |  71 +++++++
 .../test/server/GrpcServerIntegrationTest.java     |  61 ++++++
 .../apache/rocketmq/test/server/MockServer.java    |  35 ++++
 6 files changed, 477 insertions(+)

diff --git a/java/pom.xml b/java/pom.xml
index 779a74d4..f081c19a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -31,6 +31,7 @@
         <module>client-apis</module>
         <module>client</module>
         <module>client-shade</module>
+        <module>test</module>
     </modules>
 
     <properties>
@@ -62,6 +63,7 @@
         <assertj-core.version>2.6.0</assertj-core.version>
         <mockito-core.version>3.10.0</mockito-core.version>
         <awaitility.version>4.1.0</awaitility.version>
+        <bcpkix.version>1.77</bcpkix.version>
 
         <!-- plugin -->
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
@@ -166,6 +168,12 @@
                 <artifactId>grpc-stub</artifactId>
                 <version>${grpc.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-testing</artifactId>
+                <scope>test</scope>
+                <version>${grpc.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
@@ -247,6 +255,13 @@
                 <version>${awaitility.version}</version>
                 <scope>test</scope>
             </dependency>
+            <!-- 
https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk18on -->
+            <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcpkix-jdk18on</artifactId>
+                <version>${bcpkix.version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/java/test/pom.xml b/java/test/pom.xml
new file mode 100644
index 00000000..55f643e3
--- /dev/null
+++ b/java/test/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-client-java-parent</artifactId>
+        <version>5.0.7-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>test</artifactId>
+
+    <properties>
+        <maven.compiler.release>8</maven.compiler.release>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-client-java-noshade</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk18on</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java
 
b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java
new file mode 100644
index 00000000..b03cf70d
--- /dev/null
+++ 
b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import apache.rocketmq.v2.Address;
+import apache.rocketmq.v2.Assignment;
+import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.Endpoints;
+import apache.rocketmq.v2.ExponentialBackoff;
+import apache.rocketmq.v2.HeartbeatRequest;
+import apache.rocketmq.v2.HeartbeatResponse;
+import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.MessageType;
+import apache.rocketmq.v2.Permission;
+import apache.rocketmq.v2.QueryAssignmentRequest;
+import apache.rocketmq.v2.QueryAssignmentResponse;
+import apache.rocketmq.v2.QueryRouteRequest;
+import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.ReceiveMessageRequest;
+import apache.rocketmq.v2.ReceiveMessageResponse;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.TelemetryCommand;
+import com.google.protobuf.Duration;
+import io.grpc.stub.StreamObserver;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.test.helper.ResponseWriter;
+import org.apache.rocketmq.test.server.GrpcServerIntegrationTest;
+import org.apache.rocketmq.test.server.MockServer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AttemptIdIntegrationTest extends GrpcServerIntegrationTest {
+    private final String topic = "topic";
+    private final String broker = "broker";
+    private final ResponseWriter responseWriter = ResponseWriter.getInstance();
+    private final Status mockStatus = Status.newBuilder()
+        .setCode(Code.OK)
+        .setMessage("mock test")
+        .build();
+
+    private final List<String> attemptIdList = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean serverDeadlineFlag = new AtomicBoolean(true);
+
+    @Before
+    public void setUp() throws Exception {
+        MockServer serverImpl = new MockServer() {
+            @Override
+            public void queryRoute(QueryRouteRequest request, 
StreamObserver<QueryRouteResponse> responseObserver) {
+                responseWriter.write(responseObserver, 
QueryRouteResponse.newBuilder()
+                    .setStatus(mockStatus)
+                    .addMessageQueues(MessageQueue.newBuilder()
+                        .setTopic(Resource.newBuilder()
+                            .setName(topic).build())
+                        .setId(0)
+                        .setPermission(Permission.READ_WRITE)
+                        .setBroker(Broker.newBuilder()
+                            .setName(broker)
+                            .setId(0)
+                            .setEndpoints(Endpoints.newBuilder()
+                                .addAddresses(Address.newBuilder()
+                                    .setHost("127.0.0.1")
+                                    .setPort(port)
+                                    .build())
+                                .build())
+                            .build())
+                        .addAcceptMessageTypes(MessageType.NORMAL)
+                        .build())
+                    .build());
+            }
+
+            @Override
+            public void heartbeat(HeartbeatRequest request, 
StreamObserver<HeartbeatResponse> responseObserver) {
+                responseWriter.write(responseObserver, 
HeartbeatResponse.newBuilder().setStatus(mockStatus)
+                    .build());
+            }
+
+            @Override
+            public void queryAssignment(QueryAssignmentRequest request,
+                StreamObserver<QueryAssignmentResponse> responseObserver) {
+                responseWriter.write(responseObserver, 
QueryAssignmentResponse.newBuilder().setStatus(mockStatus)
+                        .addAssignments(Assignment.newBuilder()
+                            .setMessageQueue(MessageQueue.newBuilder()
+                                .setTopic(Resource.newBuilder()
+                                    .setName(topic).build())
+                                .setId(0)
+                                .setPermission(Permission.READ_WRITE)
+                                .setBroker(Broker.newBuilder()
+                                    .setName(broker)
+                                    .setId(0)
+                                    .setEndpoints(Endpoints.newBuilder()
+                                        .addAddresses(Address.newBuilder()
+                                            .setHost("127.0.0.1")
+                                            .setPort(port)
+                                            .build())
+                                        .build())
+                                    .build())
+                                .addAcceptMessageTypes(MessageType.NORMAL)
+                                .build())
+                            .build())
+                    .build());
+            }
+
+            @Override
+            public void receiveMessage(ReceiveMessageRequest request,
+                StreamObserver<ReceiveMessageResponse> responseObserver) {
+                // prevent too much request
+                if (attemptIdList.size() >= 3) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+                attemptIdList.add(request.getAttemptId());
+                if (serverDeadlineFlag.compareAndSet(true, false)) {
+                    // timeout
+                } else {
+                    
responseObserver.onNext(ReceiveMessageResponse.newBuilder().setStatus(mockStatus).build());
+                    responseObserver.onCompleted();
+                }
+            }
+
+            @Override
+            public StreamObserver<TelemetryCommand> 
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
+                return new StreamObserver<TelemetryCommand>() {
+                    @Override
+                    public void onNext(TelemetryCommand value) {
+                        
responseObserver.onNext(value.toBuilder().setStatus(mockStatus)
+                            .setSettings(value.getSettings().toBuilder()
+                                
.setBackoffPolicy(value.getSettings().getBackoffPolicy().toBuilder()
+                                    .setMaxAttempts(16)
+                                    
.setExponentialBackoff(ExponentialBackoff.newBuilder()
+                                        .setInitial(Duration.newBuilder()
+                                            .setSeconds(1).build())
+                                        .setMax(Duration.newBuilder()
+                                            .setSeconds(10).build())
+                                        .setMultiplier(1.5f)
+                                        .build()))).build());
+                        responseObserver.onCompleted();
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        responseObserver.onCompleted();
+                    }
+                };
+            }
+        };
+
+        setUpServer(serverImpl, port);
+        serverImpl.setPort(port);
+    }
+
+    @Test
+    public void test() throws Exception {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        String accessKey = "yourAccessKey";
+        String secretKey = "yourSecretKey";
+        SessionCredentialsProvider sessionCredentialsProvider =
+            new StaticSessionCredentialsProvider(accessKey, secretKey);
+
+        String endpoints = "127.0.0.1" + ":" + port;
+        int timeout = 1000;
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(endpoints)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .setRequestTimeout(java.time.Duration.of(timeout, 
ChronoUnit.MILLIS))
+            .build();
+        String tag = "yourMessageTagA";
+        FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
+        String consumerGroup = "yourConsumerGroup";
+        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            .setConsumerGroup(consumerGroup)
+            .setSubscriptionExpressions(Collections.singletonMap(topic, 
filterExpression))
+            .setMessageListener(messageView -> ConsumeResult.SUCCESS)
+            .build();
+        try {
+            await().atMost(java.time.Duration.ofSeconds(5)).untilAsserted(() 
-> {
+                assertThat(attemptIdList.size()).isGreaterThanOrEqualTo(3);
+                
assertThat(attemptIdList.get(0)).isEqualTo(attemptIdList.get(1));
+                
assertThat(attemptIdList.get(0)).isNotEqualTo(attemptIdList.get(2));
+            });
+        } finally {
+            pushConsumer.close();
+        }
+    }
+}
diff --git 
a/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java 
b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java
new file mode 100644
index 00000000..17d2597b
--- /dev/null
+++ 
b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.helper;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ServerCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+
+public class ResponseWriter {
+    protected static final Object INSTANCE_CREATE_LOCK = new Object();
+    protected static volatile ResponseWriter instance;
+
+    public static ResponseWriter getInstance() {
+        if (instance == null) {
+            synchronized (INSTANCE_CREATE_LOCK) {
+                if (instance == null) {
+                    instance = new ResponseWriter();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public <T> void write(StreamObserver<T> observer, final T response) {
+        if (writeResponse(observer, response)) {
+            observer.onCompleted();
+        }
+    }
+
+    public <T> boolean writeResponse(StreamObserver<T> observer, final T 
response) {
+        if (null == response) {
+            return false;
+        }
+        if (isCancelled(observer)) {
+            return false;
+        }
+        try {
+            observer.onNext(response);
+        } catch (StatusRuntimeException statusRuntimeException) {
+            if (Status.CANCELLED.equals(statusRuntimeException.getStatus())) {
+                return false;
+            }
+            throw statusRuntimeException;
+        }
+        return true;
+    }
+
+    public <T> boolean isCancelled(StreamObserver<T> observer) {
+        if (observer instanceof ServerCallStreamObserver) {
+            final ServerCallStreamObserver<T> serverCallStreamObserver = 
(ServerCallStreamObserver<T>) observer;
+            return serverCallStreamObserver.isCancelled();
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java
 
b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java
new file mode 100644
index 00000000..1f08d27b
--- /dev/null
+++ 
b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.server;
+
+import apache.rocketmq.v2.MessagingServiceGrpc;
+import io.grpc.Server;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import org.junit.Rule;
+
+public class GrpcServerIntegrationTest {
+    /**
+     * This rule manages automatic graceful shutdown for the registered 
servers and channels at the end of test.
+     */
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    /**
+     * Let OS pick up an available port.
+     */
+    protected int port = 0;
+
+
+    protected void setUpServer(MessagingServiceGrpc.MessagingServiceImplBase 
serverImpl,
+        int port, ServerInterceptor... interceptors) throws IOException, 
CertificateException {
+        SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
+        ServerServiceDefinition serviceDefinition = serverImpl.bindService();
+        NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
+            .directExecutor()
+            .addService(serviceDefinition)
+            .useTransportSecurity(selfSignedCertificate.certificate(), 
selfSignedCertificate.privateKey());
+        for (ServerInterceptor interceptor : interceptors) {
+            serverBuilder = serverBuilder.intercept(interceptor);
+        }
+        Server server = serverBuilder.build()
+            .start();
+        this.port = server.getPort();
+        // Create a server, add service, start, and register for automatic 
graceful shutdown.
+        grpcCleanup.register(server);
+    }
+}
diff --git 
a/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java 
b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java
new file mode 100644
index 00000000..9fdd5fad
--- /dev/null
+++ b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.server;
+
+import apache.rocketmq.v2.MessagingServiceGrpc;
+
+public class MockServer extends MessagingServiceGrpc.MessagingServiceImplBase {
+    private Integer port;
+
+    public MockServer() {
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
+}

Reply via email to