This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fb4f5ba94d8 Add CDCRequestHandlerTest and
RetryStreamingExceptionHandlerTest (#37374)
fb4f5ba94d8 is described below
commit fb4f5ba94d85f63702cf1282c6234981945d49b5
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 13 20:08:15 2025 +0800
Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest (#37374)
* Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest
* Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest
---
.../data/pipeline/cdc/client/CDCClientTest.java | 32 ----
.../cdc/client/handler/CDCRequestHandlerTest.java | 207 +++++++++++++++++++++
.../RetryStreamingExceptionHandlerTest.java | 67 +++++++
3 files changed, 274 insertions(+), 32 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
deleted file mode 100644
index 796726249c1..00000000000
---
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.client;
-
-import
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-class CDCClientTest {
-
- @Test
- void assertInvalidParameters() {
- CDCClientConfiguration config = new
CDCClientConfiguration("localhost", -1, 1000);
- assertThrows(IllegalArgumentException.class, () -> new
CDCClient(config));
- }
-}
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandlerTest.java
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandlerTest.java
new file mode 100644
index 00000000000..e8a316d223f
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandlerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.AttributeMap;
+import io.netty.util.DefaultAttributeMap;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.exception.ServerResultException;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CDCRequestHandlerTest {
+
+ @Mock
+ private Consumer<List<Record>> consumer;
+
+ @Mock
+ private ExceptionHandler exceptionHandler;
+
+ @Mock
+ private ServerErrorResultHandler errorResultHandler;
+
+ private CDCRequestHandler handler;
+
+ @BeforeEach
+ void setUp() {
+ handler = new CDCRequestHandler(consumer, exceptionHandler,
errorResultHandler);
+ }
+
+ @Test
+ void assertChannelRegisteredAndInactive() {
+ Channel channel = mockChannel(null);
+ ChannelHandlerContext ctx = mockChannelHandlerContext(channel);
+ handler.channelRegistered(ctx);
+
assertThat(channel.attr(ClientConnectionContext.CONTEXT_KEY).get().getStatus().get(),
is(ClientConnectionStatus.NOT_LOGGED_IN));
+ handler.channelInactive(ctx);
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ void assertHandleNonSucceedResponseWithFuture() {
+ ClientConnectionContext connectionContext = new
ClientConnectionContext();
+ ResponseFuture responseFuture = new ResponseFuture("foo_req",
Type.START_STREAMING);
+ connectionContext.getResponseFutureMap().put("foo_req",
responseFuture);
+ ChannelHandlerContext ctx =
mockChannelHandlerContext(mockChannel(connectionContext));
+ CDCResponse response =
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.FAILED).setErrorCode("500").setErrorMessage("mock
error").build();
+ handler.channelRead(ctx, response);
+ ArgumentCaptor<ServerErrorResult> resultCaptor =
ArgumentCaptor.forClass(ServerErrorResult.class);
+ verify(errorResultHandler).handleServerError(eq(ctx),
resultCaptor.capture());
+ ServerErrorResult actualResult = resultCaptor.getValue();
+ assertThat(actualResult.getErrorCode(), is("500"));
+ assertThat(actualResult.getErrorMessage(), is("mock error"));
+ assertThat(actualResult.getRequestType(), is(Type.START_STREAMING));
+ assertThat(responseFuture.getErrorCode(), is("500"));
+ assertThat(responseFuture.getErrorMessage(), is("mock error"));
+ ServerResultException ex = assertThrows(ServerResultException.class,
() -> responseFuture.waitResponseResult(100L, connectionContext));
+ assertThat(ex.getMessage(), is("Get START_STREAMING response failed,
code:500, reason: mock error"));
+ assertTrue(connectionContext.getResponseFutureMap().isEmpty());
+ }
+
+ @Test
+ void assertHandleNonSucceedResponseWithoutFuture() {
+ ChannelHandlerContext ctx = mockChannelHandlerContext(mockChannel(new
ClientConnectionContext()));
+ handler.channelRead(ctx,
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.FAILED).setErrorCode("404").setErrorMessage("not
found").build());
+ ArgumentCaptor<ServerErrorResult> resultCaptor =
ArgumentCaptor.forClass(ServerErrorResult.class);
+ verify(errorResultHandler).handleServerError(eq(ctx),
resultCaptor.capture());
+ assertThat(resultCaptor.getValue().getRequestType(), is(Type.UNKNOWN));
+ }
+
+ @Test
+ void assertHandleServerGreeting() {
+ ClientConnectionContext connectionContext = new
ClientConnectionContext();
+ ChannelHandlerContext ctx =
mockChannelHandlerContext(mockChannel(connectionContext));
+ CDCResponse response =
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED)
+
.setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion("1.0").setProtocolVersion("1").build()).build();
+ handler.channelRead(ctx, response);
+ assertTrue(connectionContext.getStreamingIds().isEmpty());
+ verifyNoInteractions(errorResultHandler, consumer, exceptionHandler);
+ }
+
+ @Test
+ void assertHandleLoginResponse() {
+ Channel channel = mockChannel(null);
+ ChannelHandlerContext ctx = mockChannelHandlerContext(channel);
+ handler.channelRegistered(ctx);
+ ClientConnectionContext connectionContext =
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+ ResponseFuture responseFuture = new ResponseFuture("foo_req",
Type.LOGIN);
+ connectionContext.getResponseFutureMap().put("foo_req",
responseFuture);
+ handler.channelRead(ctx,
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).build());
+ assertThat(connectionContext.getStatus().get(),
is(ClientConnectionStatus.LOGGED_IN));
+ assertDoesNotThrow(() -> responseFuture.waitResponseResult(500L,
connectionContext));
+ }
+
+ @Test
+ void assertHandleStreamDataResult() {
+ ClientConnectionContext connectionContext = new
ClientConnectionContext();
+
connectionContext.getStatus().set(ClientConnectionStatus.NOT_LOGGED_IN);
+ ResponseFuture responseFuture = new ResponseFuture("foo_req",
Type.STREAM_DATA);
+ connectionContext.getResponseFutureMap().put("foo_req",
responseFuture);
+ CDCResponse response = CDCResponse.newBuilder()
+
.setRequestId("foo_req").setStatus(Status.SUCCEED).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId("stream_1").build()).build();
+
handler.channelRead(mockChannelHandlerContext(mockChannel(connectionContext)),
response);
+ assertTrue(connectionContext.getStreamingIds().contains("stream_1"));
+ assertThat(responseFuture.getResult(), is("stream_1"));
+ assertThat(responseFuture.waitResponseResult(500L,
connectionContext).toString(), is("stream_1"));
+ }
+
+ @Test
+ void assertHandleDataRecordResult() {
+ Channel channel = mockChannel(new ClientConnectionContext());
+ DataRecordResult recordResult =
DataRecordResult.newBuilder().setAckId("ack_1").addRecord(Record.newBuilder().build()).build();
+ List<Record> expectedRecords = recordResult.getRecordList();
+ handler.channelRead(mockChannelHandlerContext(channel),
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).setDataRecordResult(recordResult).build());
+ verify(consumer).accept(expectedRecords);
+ assertThat(expectedRecords.size(), is(1));
+ ArgumentCaptor<CDCRequest> requestCaptor =
ArgumentCaptor.forClass(CDCRequest.class);
+ verify(channel).writeAndFlush(requestCaptor.capture());
+ CDCRequest ackRequest = requestCaptor.getValue();
+ assertThat(ackRequest.getType(), is(Type.ACK_STREAMING));
+ assertThat(ackRequest.getAckStreamingRequestBody().getAckId(),
is("ack_1"));
+ }
+
+ @Test
+ void assertHandleSucceedWithoutPayload() {
+ ClientConnectionContext connectionContext = new
ClientConnectionContext();
+ ResponseFuture responseFuture = new ResponseFuture("foo_req",
Type.DROP_STREAMING);
+ connectionContext.getResponseFutureMap().put("foo_req",
responseFuture);
+
handler.channelRead(mockChannelHandlerContext(mockChannel(connectionContext)),
CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).build());
+ assertDoesNotThrow(() -> responseFuture.waitResponseResult(500L,
connectionContext));
+ }
+
+ @Test
+ void assertExceptionCaughtDelegates() {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ RuntimeException expectedCause = new RuntimeException("mock");
+ handler.exceptionCaught(ctx, expectedCause);
+ verify(exceptionHandler).handleException(ctx, expectedCause);
+ }
+
+ private Channel mockChannel(final ClientConnectionContext context) {
+ Channel result = mock(Channel.class);
+ AttributeMap attributeMap = new DefaultAttributeMap();
+
when(result.attr(ClientConnectionContext.CONTEXT_KEY)).thenAnswer(invocation ->
attributeMap.attr(invocation.getArgument(0)));
+ result.attr(ClientConnectionContext.CONTEXT_KEY).set(context);
+
when(result.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));
+ return result;
+ }
+
+ private ChannelHandlerContext mockChannelHandlerContext(final Channel
channel) {
+ ChannelHandlerContext result = mock(ChannelHandlerContext.class);
+ when(result.channel()).thenReturn(channel);
+ when(result.fireChannelInactive()).thenReturn(result);
+ return result;
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandlerTest.java
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandlerTest.java
new file mode 100644
index 00000000000..ff03a438e40
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class RetryStreamingExceptionHandlerTest {
+
+ @Mock
+ private CDCClient cdcClient;
+
+ @Mock
+ private ChannelHandlerContext channelHandlerContext;
+
+ @BeforeEach
+ void setUp() {
+ ClientConnectionContext connectionContext = new
ClientConnectionContext();
+ connectionContext.getStreamingIds().add("foo_stream_id");
+ EmbeddedChannel channel = new EmbeddedChannel();
+
channel.attr(ClientConnectionContext.CONTEXT_KEY).set(connectionContext);
+ when(channelHandlerContext.channel()).thenReturn(channel);
+ }
+
+ @Test
+ void assertRestartStreamingWhenRetryTimesNotExceed() {
+ new RetryStreamingExceptionHandler(cdcClient, 2,
10).handleException(channelHandlerContext, new RuntimeException(""));
+ verify(cdcClient, timeout(3000L)).restartStreaming("foo_stream_id");
+ verify(cdcClient, never()).stopStreaming(any());
+ }
+
+ @Test
+ void assertStopStreamingWhenRetryTimesExceed() {
+ new RetryStreamingExceptionHandler(cdcClient, 0,
10).handleException(channelHandlerContext, new RuntimeException(""));
+ verify(cdcClient, timeout(3000L)).stopStreaming("foo_stream_id");
+ verify(cdcClient, never()).restartStreaming(any());
+ }
+}