This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 8f16d838 [ISSUE #726] Add test module to provide integration test for java client (#727) 8f16d838 is described below commit 8f16d838bafcb7d3d4566cc446753e8e9582b8a8 Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Tue Apr 16 14:20:16 2024 +0800 [ISSUE #726] Add test module to provide integration test for java client (#727) * Add GrpcServerIntegrationTest * Add AttemptIdIntegrationTest * fix jdk17 SelfSignedCertificate --- java/pom.xml | 15 ++ java/test/pom.xml | 71 +++++++ .../test/client/AttemptIdIntegrationTest.java | 224 +++++++++++++++++++++ .../rocketmq/test/helper/ResponseWriter.java | 71 +++++++ .../test/server/GrpcServerIntegrationTest.java | 61 ++++++ .../apache/rocketmq/test/server/MockServer.java | 35 ++++ 6 files changed, 477 insertions(+) diff --git a/java/pom.xml b/java/pom.xml index 779a74d4..f081c19a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -31,6 +31,7 @@ <module>client-apis</module> <module>client</module> <module>client-shade</module> + <module>test</module> </modules> <properties> @@ -62,6 +63,7 @@ <assertj-core.version>2.6.0</assertj-core.version> <mockito-core.version>3.10.0</mockito-core.version> <awaitility.version>4.1.0</awaitility.version> + <bcpkix.version>1.77</bcpkix.version> <!-- plugin --> <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version> @@ -166,6 +168,12 @@ <artifactId>grpc-stub</artifactId> <version>${grpc.version}</version> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <scope>test</scope> + <version>${grpc.version}</version> + </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> @@ -247,6 +255,13 @@ <version>${awaitility.version}</version> <scope>test</scope> </dependency> + <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk18on --> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk18on</artifactId> + <version>${bcpkix.version}</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> diff --git a/java/test/pom.xml b/java/test/pom.xml new file mode 100644 index 00000000..55f643e3 --- /dev/null +++ b/java/test/pom.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client-java-parent</artifactId> + <version>5.0.7-SNAPSHOT</version> + </parent> + + <artifactId>test</artifactId> + + <properties> + <maven.compiler.release>8</maven.compiler.release> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-client-java-noshade</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk18on</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java new file mode 100644 index 00000000..b03cf70d --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import apache.rocketmq.v2.Address; +import apache.rocketmq.v2.Assignment; +import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.Endpoints; +import apache.rocketmq.v2.ExponentialBackoff; +import apache.rocketmq.v2.HeartbeatRequest; +import apache.rocketmq.v2.HeartbeatResponse; +import apache.rocketmq.v2.MessageQueue; +import apache.rocketmq.v2.MessageType; +import apache.rocketmq.v2.Permission; +import apache.rocketmq.v2.QueryAssignmentRequest; +import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.ReceiveMessageRequest; +import apache.rocketmq.v2.ReceiveMessageResponse; +import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Status; +import apache.rocketmq.v2.TelemetryCommand; +import com.google.protobuf.Duration; +import io.grpc.stub.StreamObserver; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.test.helper.ResponseWriter; +import org.apache.rocketmq.test.server.GrpcServerIntegrationTest; +import org.apache.rocketmq.test.server.MockServer; +import org.junit.Before; +import org.junit.Test; + +public class AttemptIdIntegrationTest extends GrpcServerIntegrationTest { + private final String topic = "topic"; + private final String broker = "broker"; + private final ResponseWriter responseWriter = ResponseWriter.getInstance(); + private final Status mockStatus = Status.newBuilder() + .setCode(Code.OK) + .setMessage("mock test") + .build(); + + private final List<String> attemptIdList = new CopyOnWriteArrayList<>(); + private final AtomicBoolean serverDeadlineFlag = new AtomicBoolean(true); + + @Before + public void setUp() throws Exception { + MockServer serverImpl = new MockServer() { + @Override + public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteResponse> responseObserver) { + responseWriter.write(responseObserver, QueryRouteResponse.newBuilder() + .setStatus(mockStatus) + .addMessageQueues(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic).build()) + .setId(0) + .setPermission(Permission.READ_WRITE) + .setBroker(Broker.newBuilder() + .setName(broker) + .setId(0) + .setEndpoints(Endpoints.newBuilder() + .addAddresses(Address.newBuilder() + .setHost("127.0.0.1") + .setPort(port) + .build()) + .build()) + .build()) + .addAcceptMessageTypes(MessageType.NORMAL) + .build()) + .build()); + } + + @Override + public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) { + responseWriter.write(responseObserver, HeartbeatResponse.newBuilder().setStatus(mockStatus) + .build()); + } + + @Override + public void queryAssignment(QueryAssignmentRequest request, + StreamObserver<QueryAssignmentResponse> responseObserver) { + responseWriter.write(responseObserver, QueryAssignmentResponse.newBuilder().setStatus(mockStatus) + .addAssignments(Assignment.newBuilder() + .setMessageQueue(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic).build()) + .setId(0) + .setPermission(Permission.READ_WRITE) + .setBroker(Broker.newBuilder() + .setName(broker) + .setId(0) + .setEndpoints(Endpoints.newBuilder() + .addAddresses(Address.newBuilder() + .setHost("127.0.0.1") + .setPort(port) + .build()) + .build()) + .build()) + .addAcceptMessageTypes(MessageType.NORMAL) + .build()) + .build()) + .build()); + } + + @Override + public void receiveMessage(ReceiveMessageRequest request, + StreamObserver<ReceiveMessageResponse> responseObserver) { + // prevent too much request + if (attemptIdList.size() >= 3) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + attemptIdList.add(request.getAttemptId()); + if (serverDeadlineFlag.compareAndSet(true, false)) { + // timeout + } else { + responseObserver.onNext(ReceiveMessageResponse.newBuilder().setStatus(mockStatus).build()); + responseObserver.onCompleted(); + } + } + + @Override + public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) { + return new StreamObserver<TelemetryCommand>() { + @Override + public void onNext(TelemetryCommand value) { + responseObserver.onNext(value.toBuilder().setStatus(mockStatus) + .setSettings(value.getSettings().toBuilder() + .setBackoffPolicy(value.getSettings().getBackoffPolicy().toBuilder() + .setMaxAttempts(16) + .setExponentialBackoff(ExponentialBackoff.newBuilder() + .setInitial(Duration.newBuilder() + .setSeconds(1).build()) + .setMax(Duration.newBuilder() + .setSeconds(10).build()) + .setMultiplier(1.5f) + .build()))).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + }; + + setUpServer(serverImpl, port); + serverImpl.setPort(port); + } + + @Test + public void test() throws Exception { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + + String endpoints = "127.0.0.1" + ":" + port; + int timeout = 1000; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .setRequestTimeout(java.time.Duration.of(timeout, ChronoUnit.MILLIS)) + .build(); + String tag = "yourMessageTagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + String consumerGroup = "yourConsumerGroup"; + PushConsumer pushConsumer = provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .build(); + try { + await().atMost(java.time.Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(attemptIdList.size()).isGreaterThanOrEqualTo(3); + assertThat(attemptIdList.get(0)).isEqualTo(attemptIdList.get(1)); + assertThat(attemptIdList.get(0)).isNotEqualTo(attemptIdList.get(2)); + }); + } finally { + pushConsumer.close(); + } + } +} diff --git a/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java new file mode 100644 index 00000000..17d2597b --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.helper; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; + +public class ResponseWriter { + protected static final Object INSTANCE_CREATE_LOCK = new Object(); + protected static volatile ResponseWriter instance; + + public static ResponseWriter getInstance() { + if (instance == null) { + synchronized (INSTANCE_CREATE_LOCK) { + if (instance == null) { + instance = new ResponseWriter(); + } + } + } + return instance; + } + + public <T> void write(StreamObserver<T> observer, final T response) { + if (writeResponse(observer, response)) { + observer.onCompleted(); + } + } + + public <T> boolean writeResponse(StreamObserver<T> observer, final T response) { + if (null == response) { + return false; + } + if (isCancelled(observer)) { + return false; + } + try { + observer.onNext(response); + } catch (StatusRuntimeException statusRuntimeException) { + if (Status.CANCELLED.equals(statusRuntimeException.getStatus())) { + return false; + } + throw statusRuntimeException; + } + return true; + } + + public <T> boolean isCancelled(StreamObserver<T> observer) { + if (observer instanceof ServerCallStreamObserver) { + final ServerCallStreamObserver<T> serverCallStreamObserver = (ServerCallStreamObserver<T>) observer; + return serverCallStreamObserver.isCancelled(); + } + return false; + } +} \ No newline at end of file diff --git a/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java new file mode 100644 index 00000000..1f08d27b --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.server; + +import apache.rocketmq.v2.MessagingServiceGrpc; +import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerServiceDefinition; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.security.cert.CertificateException; +import org.junit.Rule; + +public class GrpcServerIntegrationTest { + /** + * This rule manages automatic graceful shutdown for the registered servers and channels at the end of test. + */ + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + /** + * Let OS pick up an available port. + */ + protected int port = 0; + + + protected void setUpServer(MessagingServiceGrpc.MessagingServiceImplBase serverImpl, + int port, ServerInterceptor... interceptors) throws IOException, CertificateException { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + ServerServiceDefinition serviceDefinition = serverImpl.bindService(); + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) + .directExecutor() + .addService(serviceDefinition) + .useTransportSecurity(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()); + for (ServerInterceptor interceptor : interceptors) { + serverBuilder = serverBuilder.intercept(interceptor); + } + Server server = serverBuilder.build() + .start(); + this.port = server.getPort(); + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(server); + } +} diff --git a/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java new file mode 100644 index 00000000..9fdd5fad --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.server; + +import apache.rocketmq.v2.MessagingServiceGrpc; + +public class MockServer extends MessagingServiceGrpc.MessagingServiceImplBase { + private Integer port; + + public MockServer() { + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } +}