This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 778161eed71 Refactor for code format of CDC (#26737)
778161eed71 is described below
commit 778161eed71dab4e16c63d88a02fff9e226335fe
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Jul 3 12:34:55 2023 +0800
Refactor for code format of CDC (#26737)
* Refactor CDCJobItemContext package name
* Refactor CDCResponseGenerator
* Rename prepareIncremental to initIncrementalPosition
* Improve CDC swapper package name
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +-
.../cdc/context/{job => }/CDCJobItemContext.java | 3 +-
.../cdc/core/importer/sink/CDCSocketSink.java | 5 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 4 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +-
.../cdc/generator/CDCResponseGenerator.java | 53 -------------
.../pipeline/cdc/generator/CDCResponseUtils.java | 88 ++++++++++++++++++++++
.../pipeline/cdc/handler/CDCBackendHandler.java | 5 +-
.../{job => config}/YamlCDCJobConfiguration.java | 2 +-
.../YamlCDCJobConfigurationSwapper.java | 5 +-
.../job/YamlCDCJobConfigurationSwapperTest.java | 4 +-
.../frontend/netty/CDCChannelInboundHandler.java | 14 ++--
12 files changed, 117 insertions(+), 78 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e979b33b6a3..bb9c398c4fd 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -38,9 +38,9 @@ import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
similarity index 97%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index eb4f902fd14..4c08d89209b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.context.job;
+package org.apache.shardingsphere.data.pipeline.cdc.context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index ef48f9be588..0db35459a89 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -22,7 +22,8 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -92,7 +93,7 @@ public final class CDCSocketSink implements PipelineSink {
resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
}
DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
-
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+ channel.writeAndFlush(CDCResponseUtils.succeed("",
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
return new PipelineJobProgressUpdatedParameter(resultRecords.size());
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index b67eda9c1da..1d7ee243f9a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -24,10 +24,10 @@ import
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index edd5e8a4d9e..28d9617a341 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
@@ -92,14 +92,14 @@ public final class CDCJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- prepareIncremental(jobItemContext);
+ initIncrementalPosition(jobItemContext);
if (jobItemContext.getJobConfig().isFull()) {
initInventoryTasks(jobItemContext, inventoryImporterUsed,
inventoryChannelProgressPairs);
}
initIncrementalTask(jobItemContext, incrementalImporterUsed,
incrementalChannelProgressPairs);
}
- private void prepareIncremental(final CDCJobItemContext jobItemContext) {
+ private void initIncrementalPosition(final CDCJobItemContext
jobItemContext) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
deleted file mode 100644
index 9983609f30c..00000000000
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
-
-/**
- * CDC response message generator.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCResponseGenerator {
-
- /**
- * Succeed response builder.
- *
- * @param requestId request id
- * @return succeed response builder
- */
- public static Builder succeedBuilder(final String requestId) {
- return
CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
- }
-
- /**
- * Failed response.
- *
- * @param requestId request id
- * @param errorCode error code
- * @param errorMessage error message
- * @return failed response
- */
- public static CDCResponse failed(final String requestId, final String
errorCode, final String errorMessage) {
- return
CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
- }
-}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
new file mode 100644
index 00000000000..1c4f0f73b8c
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import com.google.protobuf.Message;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+
+/**
+ * CDC response utils.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCResponseUtils {
+
+ /**
+ * Succeed response.
+ *
+ * @param requestId request id
+ * @return CDC response
+ */
+ public static CDCResponse succeed(final String requestId) {
+ return succeed(requestId, ResponseCase.RESPONSE_NOT_SET, null);
+ }
+
+ /**
+ * Succeed response.
+ *
+ * @param requestId request id
+ * @param responseCase response case
+ * @param response response
+ * @return succeed response builder
+ * @throws PipelineInvalidParameterException pipeline invalid parameter
exception
+ */
+ public static CDCResponse succeed(final String requestId, final
CDCResponse.ResponseCase responseCase, final Message response) {
+ Builder result =
CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
+ switch (responseCase) {
+ case SERVER_GREETING_RESULT:
+ result.setServerGreetingResult((ServerGreetingResult)
response);
+ break;
+ case DATA_RECORD_RESULT:
+ result.setDataRecordResult((DataRecordResult) response);
+ break;
+ case STREAM_DATA_RESULT:
+ result.setStreamDataResult((StreamDataResult) response);
+ break;
+ case RESPONSE_NOT_SET:
+ break;
+ default:
+ throw new
PipelineInvalidParameterException(responseCase.name());
+ }
+ return result.build();
+ }
+
+ /**
+ * Failed response.
+ *
+ * @param requestId request id
+ * @param errorCode error code
+ * @param errorMessage error message
+ * @return failed response
+ */
+ public static CDCResponse failed(final String requestId, final String
errorCode, final String errorMessage) {
+ return
CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index a083e753ee4..e733e0e5488 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -34,11 +34,12 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
-import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
@@ -111,7 +112,7 @@ public final class CDCBackendHandler {
String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new
Properties());
connectionContext.setJobId(jobId);
startStreaming(jobId, connectionContext, channel);
- return
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
+ return CDCResponseUtils.succeed(requestId,
ResponseCase.STREAM_DATA_RESULT,
StreamDataResult.newBuilder().setStreamingId(jobId).build());
}
/**
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
similarity index 96%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
index 246cebb9ab5..9c2580970af 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.config;
import lombok.Getter;
import lombok.Setter;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
similarity index 95%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
index 4348c97d7c5..f4cb413f470 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index eabd72bc50b..a4888939630 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 390bfa5736e..41f925ed363 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
-import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -90,9 +90,9 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
if (cause instanceof CDCExceptionWrapper) {
CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
ShardingSphereSQLException exception = wrapper.getException();
- channelFuture =
ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(),
exception.toSQLException().getSQLState(), exception.getMessage()));
+ channelFuture =
ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(),
exception.toSQLException().getSQLState(), exception.getMessage()));
} else {
- channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("",
XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
+ channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("",
XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
}
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null == connectionContext) {
@@ -139,7 +139,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
Optional<ShardingSphereUser> user = authorityRule.findUser(new
Grantee(body.getUsername(), getHostAddress(ctx)));
if (user.isPresent() &&
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
body.getPassword())) {
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new
CDCConnectionContext(user.get()));
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
} else {
throw new CDCExceptionWrapper(request.getRequestId(), new
CDCLoginException("Illegal username or password"));
}
@@ -197,7 +197,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
String database =
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
backendHandler.startStreaming(requestBody.getStreamingId(),
connectionContext, ctx.channel());
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
}
private void processStopStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
@@ -206,7 +206,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
backendHandler.stopStreaming(connectionContext.getJobId(),
ctx.channel().id());
connectionContext.setJobId(null);
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
}
private void processDropStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
@@ -214,6 +214,6 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(),
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
backendHandler.dropStreaming(connectionContext.getJobId());
connectionContext.setJobId(null);
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
}
}