This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d8ad81d012 Remove PipelineSink.identifierMatched() (#29518)
6d8ad81d012 is described below

commit 6d8ad81d0129e5ed77aa060e434c5a11dead2cdc
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 20:10:01 2023 +0800

    Remove PipelineSink.identifierMatched() (#29518)
    
    * Refactor DataRecord.getKey()
    
    * Move DataRecordGroupEngine
    
    * Move DataRecordGroupEngine
    
    * Remove PipelineSink.identifierMatched()
---
 .../data/pipeline/core/importer/sink/PipelineDataSourceSink.java  | 5 -----
 .../data/pipeline/core/importer/sink/PipelineSink.java            | 8 --------
 .../data/pipeline/cdc/core/importer/sink/CDCSocketSink.java       | 5 -----
 .../data/pipeline/cdc/handler/CDCBackendHandler.java              | 2 +-
 4 files changed, 1 insertion(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 842a28a8dfd..3fdc0c82e73 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -80,11 +80,6 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         groupEngine = new DataRecordGroupEngine();
     }
     
-    @Override
-    public boolean identifierMatched(final Object identifier) {
-        throw new UnsupportedOperationException();
-    }
-    
     @Override
     public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
         return 
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), 
records);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
index b38c44b8e03..9f356cf35b5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
@@ -28,14 +28,6 @@ import java.util.Collection;
  */
 public interface PipelineSink extends Closeable {
     
-    /**
-     * Identifier matched or not.
-     *
-     * @param identifier sink identifier
-     * @return true if matched, otherwise false
-     */
-    boolean identifierMatched(Object identifier);
-    
     /**
      * Write data.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 8f5c3eca0a5..a5b2c85f039 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -69,11 +69,6 @@ public final class CDCSocketSink implements PipelineSink {
         });
     }
     
-    @Override
-    public boolean identifierMatched(final Object identifier) {
-        return channel.id().equals(identifier);
-    }
-    
     @Override
     public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
         if (records.isEmpty()) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index a397ee6272a..68e17dda1df 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -155,7 +155,7 @@ public final class CDCBackendHandler {
         if (null == job) {
             return;
         }
-        if (job.getSink().identifierMatched(channelId)) {
+        if (((CDCSocketSink) 
job.getSink()).getChannel().id().equals(channelId)) {
             log.info("close CDC job, channel id: {}", channelId);
             PipelineJobRegistry.stop(jobId);
             jobAPI.disable(jobId);

Reply via email to