This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6d8ad81d012 Remove PipelineSink.identifierMatched() (#29518)
6d8ad81d012 is described below
commit 6d8ad81d0129e5ed77aa060e434c5a11dead2cdc
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 20:10:01 2023 +0800
Remove PipelineSink.identifierMatched() (#29518)
* Refactor DataRecord.getKey()
* Move DataRecordGroupEngine
* Move DataRecordGroupEngine
* Remove PipelineSink.identifierMatched()
---
.../data/pipeline/core/importer/sink/PipelineDataSourceSink.java | 5 -----
.../data/pipeline/core/importer/sink/PipelineSink.java | 8 --------
.../data/pipeline/cdc/core/importer/sink/CDCSocketSink.java | 5 -----
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 2 +-
4 files changed, 1 insertion(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 842a28a8dfd..3fdc0c82e73 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -80,11 +80,6 @@ public final class PipelineDataSourceSink implements
PipelineSink {
groupEngine = new DataRecordGroupEngine();
}
- @Override
- public boolean identifierMatched(final Object identifier) {
- throw new UnsupportedOperationException();
- }
-
@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
return
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()),
records);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
index b38c44b8e03..9f356cf35b5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
@@ -28,14 +28,6 @@ import java.util.Collection;
*/
public interface PipelineSink extends Closeable {
- /**
- * Identifier matched or not.
- *
- * @param identifier sink identifier
- * @return true if matched, otherwise false
- */
- boolean identifierMatched(Object identifier);
-
/**
* Write data.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 8f5c3eca0a5..a5b2c85f039 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -69,11 +69,6 @@ public final class CDCSocketSink implements PipelineSink {
});
}
- @Override
- public boolean identifierMatched(final Object identifier) {
- return channel.id().equals(identifier);
- }
-
@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
if (records.isEmpty()) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index a397ee6272a..68e17dda1df 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -155,7 +155,7 @@ public final class CDCBackendHandler {
if (null == job) {
return;
}
- if (job.getSink().identifierMatched(channelId)) {
+ if (((CDCSocketSink)
job.getSink()).getChannel().id().equals(channelId)) {
log.info("close CDC job, channel id: {}", channelId);
PipelineJobRegistry.stop(jobId);
jobAPI.disable(jobId);