This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 657a59243f8 Add CDCBackendHandlerTest (#37399)
657a59243f8 is described below

commit 657a59243f8701ac6c5a7faf076f9854aedbf4f9
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 16 13:10:40 2025 +0800

    Add CDCBackendHandlerTest (#37399)
---
 .../pipeline/cdc/generator/CDCResponseUtils.java   |   6 +-
 .../pipeline/cdc/handler/CDCBackendHandler.java    |   4 +-
 .../cdc/handler/CDCBackendHandlerTest.java         | 293 +++++++++++++++++++++
 3 files changed, 298 insertions(+), 5 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
index 1e8072210bc..a390640f179 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
@@ -38,7 +38,7 @@ public final class CDCResponseUtils {
     /**
      * Succeed response.
      *
-     * @param requestId request id
+     * @param requestId request ID
      * @return CDC response
      */
     public static CDCResponse succeed(final String requestId) {
@@ -48,7 +48,7 @@ public final class CDCResponseUtils {
     /**
      * Succeed response.
      *
-     * @param requestId request id
+     * @param requestId request ID
      * @param responseCase response case
      * @param response response
      * @return succeed response builder
@@ -77,7 +77,7 @@ public final class CDCResponseUtils {
     /**
      * Failed response.
      *
-     * @param requestId request id
+     * @param requestId request ID
      * @param errorCode error code
      * @param errorMessage error message
      * @return failed response
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index e129229e4b7..9f7ca7a8788 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -110,8 +110,8 @@ public final class CDCBackendHandler {
             tableNames = 
schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
             schemaTableNameMap.forEach((key, value) -> value.forEach(tableName 
-> schemaTableNames.add(key.isEmpty() ? tableName : String.join(".", key, 
tableName))));
         } else {
-            
schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database,
 requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable)
-                    .collect(Collectors.toList())));
+            
schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database,
+                    
requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable).collect(Collectors.toList())));
             tableNames = schemaTableNames;
         }
         ShardingSpherePreconditions.checkNotEmpty(tableNames, () -> new 
CDCExceptionWrapper(requestId, new MissingRequiredStreamDataSourceException()));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandlerTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandlerTest.java
new file mode 100644
index 00000000000..f953ccfdf51
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandlerTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.handler;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.DefaultChannelId;
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
+import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import 
org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
+import 
org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDataNodeUtils;
+import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
+import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.internal.configuration.plugins.Plugins;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Properties;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({TypedSPILoader.class, DatabaseTypedSPILoader.class, 
PipelineContextManager.class, CDCSchemaTableUtils.class, 
PipelineDataNodeUtils.class, PipelineJobRegistry.class,
+        PipelineJobIdUtils.class, CDCImporterManager.class})
+class CDCBackendHandlerTest {
+    
+    private CDCJobAPI jobAPI;
+    
+    private PipelineJobConfigurationManager jobConfigManager;
+    
+    private CDCBackendHandler backendHandler;
+    
+    @BeforeEach
+    void setUp() throws ReflectiveOperationException {
+        jobAPI = mock(CDCJobAPI.class);
+        jobConfigManager = mock(PipelineJobConfigurationManager.class);
+        when(TypedSPILoader.getService(any(), any())).thenReturn(jobAPI);
+        backendHandler = new CDCBackendHandler();
+        
Plugins.getMemberAccessor().set(CDCBackendHandler.class.getDeclaredField("jobAPI"),
 backendHandler, jobAPI);
+        
Plugins.getMemberAccessor().set(CDCBackendHandler.class.getDeclaredField("jobConfigManager"),
 backendHandler, jobConfigManager);
+    }
+    
+    @Test
+    void assertGetDatabaseNameByJobId() {
+        CDCJobConfiguration jobConfig = mock(CDCJobConfiguration.class);
+        when(jobConfig.getDatabaseName()).thenReturn("foo_db");
+        
when(jobConfigManager.getJobConfiguration("foo_job")).thenReturn(jobConfig);
+        assertThat(backendHandler.getDatabaseNameByJobId("foo_job"), 
is("foo_db"));
+    }
+    
+    @Test
+    void assertStreamDataWhenSchemaAvailable() {
+        ShardingSphereDatabase database = mockDatabase();
+        mockDialectMetaData(true, false);
+        when(CDCSchemaTableUtils.parseTableExpressionWithSchema(eq(database), 
anyCollection()))
+                .thenReturn(Collections.singletonMap("foo_schema", new 
LinkedHashSet<>(Collections.singleton("foo_tbl"))));
+        when(PipelineDataNodeUtils.buildTableAndDataNodesMap(eq(database), 
anyCollection())).thenReturn(Collections.singletonMap("foo_tbl", 
Collections.singletonList(mock(DataNode.class))));
+        when(jobAPI.create(any(StreamDataParameter.class), 
eq(CDCSinkType.SOCKET), any(Properties.class))).thenReturn("foo_job");
+        
when(jobConfigManager.getJobConfiguration("foo_job")).thenReturn(createJobConfiguration());
+        StreamDataRequestBody requestBody = 
StreamDataRequestBody.newBuilder().setDatabase("foo_db")
+                
.addSourceSchemaTable(SchemaTable.newBuilder().setSchema("foo_schema").setTable("foo_tbl")).build();
+        CDCResponse actualResponse = backendHandler.streamData("foo_req", 
requestBody, createConnectionContext(), mock());
+        assertThat(actualResponse.getStatus(), is(CDCResponse.Status.SUCCEED));
+        assertThat(actualResponse.getResponseCase(), 
is(CDCResponse.ResponseCase.STREAM_DATA_RESULT));
+        assertThat(actualResponse.getStreamDataResult().getStreamingId(), 
is("foo_job"));
+    }
+    
+    @Test
+    void assertStreamDataWithoutSchema() {
+        ShardingSphereDatabase database = mockDatabase();
+        mockDialectMetaData(false, true);
+        
when(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(eq(database), 
anyList())).thenReturn(Collections.singleton("foo_tbl"));
+        when(PipelineDataNodeUtils.buildTableAndDataNodesMap(eq(database), 
anyCollection())).thenReturn(Collections.singletonMap("foo_tbl", 
Collections.singletonList(mock(DataNode.class))));
+        when(jobAPI.create(any(StreamDataParameter.class), 
eq(CDCSinkType.SOCKET), any(Properties.class))).thenReturn("foo_job");
+        
when(jobConfigManager.getJobConfiguration("foo_job")).thenReturn(createJobConfiguration());
+        StreamDataRequestBody requestBody = 
StreamDataRequestBody.newBuilder().setDatabase("foo_db").addSourceSchemaTable(SchemaTable.newBuilder().setTable("foo_tbl")).build();
+        CDCResponse actualResponse = backendHandler.streamData("foo_req", 
requestBody, createConnectionContext(), mock());
+        assertThat(actualResponse.getStatus(), is(CDCResponse.Status.SUCCEED));
+        assertThat(actualResponse.getStreamDataResult().getStreamingId(), 
is("foo_job"));
+    }
+    
+    @Test
+    void assertStreamDataWhenTableNamesMissing() {
+        ShardingSphereDatabase database = mockDatabase();
+        mockDialectMetaData(false, false);
+        
when(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(eq(database), 
anyList())).thenReturn(Collections.emptySet());
+        StreamDataRequestBody requestBody = 
StreamDataRequestBody.newBuilder().setDatabase("foo_db").addSourceSchemaTable(SchemaTable.newBuilder().setTable("foo_tbl")).build();
+        CDCExceptionWrapper actualException = 
assertThrows(CDCExceptionWrapper.class, () -> 
backendHandler.streamData("foo_req", requestBody, createConnectionContext(), 
mock()));
+        assertThat(actualException.getCause(), 
is(instanceOf(MissingRequiredStreamDataSourceException.class)));
+    }
+    
+    @Test
+    void assertStreamDataWhenDatabaseMissing() {
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("missing_db")).thenReturn(null);
+        
when(PipelineContextManager.getProxyContext()).thenReturn(contextManager);
+        StreamDataRequestBody requestBody = 
StreamDataRequestBody.newBuilder().setDatabase("missing_db").build();
+        CDCExceptionWrapper actualException = 
assertThrows(CDCExceptionWrapper.class, () -> 
backendHandler.streamData("foo_req", requestBody, createConnectionContext(), 
mock()));
+        assertThat(actualException.getCause(), 
is(instanceOf(StreamDatabaseNotFoundException.class)));
+    }
+    
+    @Test
+    void assertStartStreamingSuccess() {
+        CDCJobConfiguration jobConfig = createJobConfiguration();
+        
when(jobConfigManager.getJobConfiguration("foo_job")).thenReturn(jobConfig);
+        mockProxyContext(mock(ShardingSphereDatabase.class));
+        Channel channel = mock(Channel.class);
+        CDCConnectionContext connectionContext = createConnectionContext();
+        backendHandler.startStreaming("foo_job", connectionContext, channel);
+        ArgumentCaptor<PipelineSink> sinkCaptor = 
ArgumentCaptor.forClass(PipelineSink.class);
+        verify(jobAPI).start(eq("foo_job"), sinkCaptor.capture());
+        assertThat(((PipelineCDCSocketSink) 
sinkCaptor.getValue()).getChannel(), is(channel));
+        assertThat(connectionContext.getJobId(), is("foo_job"));
+    }
+    
+    @Test
+    void assertStartStreamingWhenJobConfigMissing() {
+        assertThrows(PipelineJobNotFoundException.class, () -> 
backendHandler.startStreaming("foo_job", createConnectionContext(), 
mock(Channel.class)));
+    }
+    
+    @Test
+    void assertStopStreamingWhenJobIdEmpty() {
+        backendHandler.stopStreaming("", DefaultChannelId.newInstance());
+        verifyNoInteractions(jobAPI);
+    }
+    
+    @Test
+    void assertStopStreamingWhenJobMissing() {
+        when(PipelineJobRegistry.get("foo_job")).thenReturn(null);
+        backendHandler.stopStreaming("foo_job", 
DefaultChannelId.newInstance());
+        verifyNoInteractions(jobAPI);
+    }
+    
+    @Test
+    void assertStopStreamingWhenChannelIdNotMatch() {
+        ChannelId targetChannelId = DefaultChannelId.newInstance();
+        Channel channel = mockChannel(DefaultChannelId.newInstance());
+        CDCJob job = mock(CDCJob.class);
+        when(job.getSink()).thenReturn(new PipelineCDCSocketSink(channel, 
mock(ShardingSphereDatabase.class), Collections.emptyList()));
+        when(PipelineJobRegistry.get("foo_job")).thenReturn(job);
+        backendHandler.stopStreaming("foo_job", targetChannelId);
+        verifyNoInteractions(jobAPI);
+    }
+    
+    @Test
+    void assertStopStreamingWhenChannelIdMatch() {
+        ChannelId targetChannelId = DefaultChannelId.newInstance();
+        Channel channel = mockChannel(targetChannelId);
+        CDCJob job = mock(CDCJob.class);
+        when(job.getSink()).thenReturn(new PipelineCDCSocketSink(channel, 
mock(ShardingSphereDatabase.class), Collections.emptyList()));
+        when(PipelineJobRegistry.get("foo_job")).thenReturn(job);
+        backendHandler.stopStreaming("foo_job", targetChannelId);
+        verify(jobAPI).disable("foo_job");
+    }
+    
+    @Test
+    void assertDropStreamingWhenJobNotDisabled() {
+        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+        jobConfigPOJO.setDisabled(false);
+        
when(PipelineJobIdUtils.getElasticJobConfigurationPOJO("foo_job")).thenReturn(jobConfigPOJO);
+        assertThrows(PipelineInternalException.class, () -> 
backendHandler.dropStreaming("foo_job"));
+        verifyNoInteractions(jobAPI);
+    }
+    
+    @Test
+    void assertDropStreamingWhenJobDisabled() {
+        JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+        jobConfigPOJO.setDisabled(true);
+        
when(PipelineJobIdUtils.getElasticJobConfigurationPOJO("foo_job")).thenReturn(jobConfigPOJO);
+        backendHandler.dropStreaming("foo_job");
+        verify(jobAPI).drop("foo_job");
+    }
+    
+    @Test
+    void assertProcessAckWhenImporterMissing() {
+        when(CDCImporterManager.getImporter("importer")).thenReturn(null);
+        
backendHandler.processAck(AckStreamingRequestBody.newBuilder().setAckId("importer_random").build());
+        verifyNoInteractions(jobAPI);
+    }
+    
+    @Test
+    void assertProcessAckWhenImporterExists() {
+        CDCImporter importer = mock(CDCImporter.class);
+        when(CDCImporterManager.getImporter("importer")).thenReturn(importer);
+        
backendHandler.processAck(AckStreamingRequestBody.newBuilder().setAckId("importer_random").build());
+        verify(importer).ack("importer_random");
+    }
+    
+    private ShardingSphereDatabase mockDatabase() {
+        ShardingSphereDatabase result = mock(ShardingSphereDatabase.class);
+        DatabaseType protocolType = mock(DatabaseType.class);
+        when(result.getProtocolType()).thenReturn(protocolType);
+        mockProxyContext(result);
+        return result;
+    }
+    
+    private void mockProxyContext(final ShardingSphereDatabase database) {
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        MetaDataContexts metaDataContexts = mock(MetaDataContexts.class, 
RETURNS_DEEP_STUBS);
+        
when(metaDataContexts.getMetaData().getDatabase("foo_db")).thenReturn(database);
+        
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
+        
when(PipelineContextManager.getProxyContext()).thenReturn(contextManager);
+    }
+    
+    private void mockDialectMetaData(final boolean schemaAvailable, final 
boolean supportGlobalCSN) {
+        DialectDatabaseMetaData databaseMetaData = 
mock(DialectDatabaseMetaData.class, RETURNS_DEEP_STUBS);
+        
when(databaseMetaData.getSchemaOption().isSchemaAvailable()).thenReturn(schemaAvailable);
+        
lenient().when(databaseMetaData.getTransactionOption().isSupportGlobalCSN()).thenReturn(supportGlobalCSN);
+        when(DatabaseTypedSPILoader.getService(any(), 
any())).thenReturn(databaseMetaData);
+    }
+    
+    private Channel mockChannel(final ChannelId channelId) {
+        Channel result = mock(Channel.class);
+        when(result.id()).thenReturn(channelId);
+        return result;
+    }
+    
+    private CDCConnectionContext createConnectionContext() {
+        return new CDCConnectionContext(new ShardingSphereUser("test_user", 
"", "localhost"));
+    }
+    
+    private CDCJobConfiguration createJobConfiguration() {
+        return new CDCJobConfiguration("foo_job", "foo_db", new 
ArrayList<>(Collections.singletonList("foo_schema.foo_tbl")), false, 
mock(DatabaseType.class),
+                mock(ShardingSpherePipelineDataSourceConfiguration.class), new 
JobDataNodeLine(Collections.emptyList()), Collections.singletonList(new 
JobDataNodeLine(Collections.emptyList())),
+                false, new 
CDCJobConfiguration.SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1, 
0);
+    }
+}

Reply via email to