This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5b81b7c20f0 Rename PipelineChannel (#29527)
5b81b7c20f0 is described below
commit 5b81b7c20f0975e1ce6cc94e07e2b8aa45cfda98
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 16:32:53 2023 +0800
Rename PipelineChannel (#29527)
* Rename DataRecordGroupEngine
* Rename PipelineChannel
---
.../importer/SingleChannelConsumerImporter.java | 2 +-
.../core/ingest/channel/PipelineChannel.java | 14 ++++++--------
.../memory/MultiplexMemoryPipelineChannel.java | 22 +++++++++++-----------
.../memory/SimpleMemoryPipelineChannel.java | 8 ++++----
.../core/ingest/dumper/InventoryDumper.java | 4 ++--
.../ingest/record/group/DataRecordGroupEngine.java | 4 ++--
.../memory/MultiplexMemoryPipelineChannelTest.java | 4 ++--
.../memory/SimpleMemoryPipelineChannelTest.java | 8 ++++----
.../mysql/ingest/MySQLIncrementalDumper.java | 2 +-
.../opengauss/ingest/OpenGaussWALDumper.java | 4 ++--
.../postgresql/ingest/PostgreSQLWALDumper.java | 4 ++--
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 2 +-
.../pipeline/cdc/core/importer/CDCImporter.java | 10 +++++-----
.../core/importer/PipelineDataSourceSinkTest.java | 8 ++++----
14 files changed, 47 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index a2b80318596..c093c289f16 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -53,7 +53,7 @@ public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecyc
@Override
protected void runBlocking() {
while (isRunning()) {
- List<Record> records = channel.fetchRecords(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
index 0f5548ac15c..79756c976db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
@@ -32,9 +32,9 @@ public interface PipelineChannel extends Closeable {
/**
* Push {@code DataRecord} into channel.
*
- * @param dataRecords data records
+ * @param records data records
*/
- void pushRecords(List<Record> dataRecords);
+ void push(List<Record> records);
/**
* Fetch {@code Record} list from channel.
@@ -45,21 +45,21 @@ public interface PipelineChannel extends Closeable {
* @param timeUnit time unit
* @return records of transactions
*/
- List<Record> fetchRecords(int batchSize, long timeout, TimeUnit timeUnit);
+ List<Record> fetch(int batchSize, long timeout, TimeUnit timeUnit);
/**
* Peek {@code Record} list from channel.
*
* @return records of a transaction
*/
- List<Record> peekRecords();
+ List<Record> peek();
/**
* Poll {@code Record} list from channel.
*
* @return records of a transaction
*/
- List<Record> pollRecords();
+ List<Record> poll();
/**
* Ack the last batch.
@@ -69,8 +69,6 @@ public interface PipelineChannel extends Closeable {
// TODO Refactor ack param
void ack(List<Record> records);
- /**
- * Close channel.
- */
+ @Override
void close();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 3b25d52b9f1..723f380896b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -51,7 +51,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
}
@Override
- public void pushRecords(final List<Record> records) {
+ public void push(final List<Record> records) {
Record firstRecord = records.get(0);
if (1 == records.size()) {
pushRecord(firstRecord);
@@ -59,7 +59,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
}
long insertDataRecordsCount =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
-> PipelineSQLOperationType.INSERT == each.getType()).count();
if (insertDataRecordsCount == records.size()) {
- channels.get(Math.abs(firstRecord.hashCode() %
channelNumber)).pushRecords(records);
+ channels.get(Math.abs(firstRecord.hashCode() %
channelNumber)).push(records);
return;
}
for (Record each : records) {
@@ -71,30 +71,30 @@ public final class MultiplexMemoryPipelineChannel
implements PipelineChannel {
List<Record> records = Collections.singletonList(ingestedRecord);
if (ingestedRecord instanceof FinishedRecord) {
for (int i = 0; i < channelNumber; i++) {
- channels.get(i).pushRecords(records);
+ channels.get(i).push(records);
}
} else if (DataRecord.class.equals(ingestedRecord.getClass())) {
- channels.get(Math.abs(ingestedRecord.hashCode() %
channelNumber)).pushRecords(records);
+ channels.get(Math.abs(ingestedRecord.hashCode() %
channelNumber)).push(records);
} else if (PlaceholderRecord.class.equals(ingestedRecord.getClass())) {
- channels.get(0).pushRecords(records);
+ channels.get(0).push(records);
} else {
throw new UnsupportedOperationException("Unsupported record type:
" + ingestedRecord.getClass().getName());
}
}
@Override
- public List<Record> fetchRecords(final int batchSize, final long timeout,
final TimeUnit timeUnit) {
- return findChannel().fetchRecords(batchSize, timeout, timeUnit);
+ public List<Record> fetch(final int batchSize, final long timeout, final
TimeUnit timeUnit) {
+ return findChannel().fetch(batchSize, timeout, timeUnit);
}
@Override
- public List<Record> peekRecords() {
- return findChannel().peekRecords();
+ public List<Record> peek() {
+ return findChannel().peek();
}
@Override
- public List<Record> pollRecords() {
- return findChannel().pollRecords();
+ public List<Record> poll() {
+ return findChannel().poll();
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 4c56ef2b68c..7f8187cb5ff 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -46,13 +46,13 @@ public final class SimpleMemoryPipelineChannel implements
PipelineChannel {
@SneakyThrows(InterruptedException.class)
@Override
- public void pushRecords(final List<Record> records) {
+ public void push(final List<Record> records) {
queue.put(records);
}
@SneakyThrows(InterruptedException.class)
@Override
- public List<Record> fetchRecords(final int batchSize, final long timeout,
final TimeUnit timeUnit) {
+ public List<Record> fetch(final int batchSize, final long timeout, final
TimeUnit timeUnit) {
List<Record> result = new LinkedList<>();
long startMillis = System.currentTimeMillis();
long timeoutMillis = timeUnit.toMillis(timeout);
@@ -73,13 +73,13 @@ public final class SimpleMemoryPipelineChannel implements
PipelineChannel {
}
@Override
- public List<Record> peekRecords() {
+ public List<Record> peek() {
List<Record> result = queue.peek();
return null == result ? Collections.emptyList() : result;
}
@Override
- public List<Record> pollRecords() {
+ public List<Record> poll() {
List<Record> result = queue.poll();
return null == result ? Collections.emptyList() : result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 6366244928e..edad97db01c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -129,7 +129,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
if (dataRecords.size() >= batchSize) {
- channel.pushRecords(dataRecords);
+ channel.push(dataRecords);
dataRecords = new LinkedList<>();
}
dataRecords.add(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
@@ -143,7 +143,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
dataRecords.add(new FinishedRecord(new
IngestFinishedPosition()));
- channel.pushRecords(dataRecords);
+ channel.push(dataRecords);
log.info("Inventory dump done, rowCount={}, dataSource={},
actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
} finally {
runningStatement.set(null);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 152b0f680b3..c412bda39ba 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -46,7 +46,7 @@ public final class DataRecordGroupEngine {
Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
Collection<String> tableNames = new LinkedHashSet<>();
Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
- Map<String, Map<PipelineSQLOperationType, List<DataRecord>>>
batchDataRecords = new LinkedHashMap<>();
+ Map<String, Map<PipelineSQLOperationType, Collection<DataRecord>>>
batchDataRecords = new LinkedHashMap<>();
for (DataRecord each : records) {
tableNames.add(each.getTableName());
if (duplicateKeys.getOrDefault(each.getKey(), false)) {
@@ -68,7 +68,7 @@ public final class DataRecordGroupEngine {
return result;
}
- private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<PipelineSQLOperationType, List<DataRecord>> batchRecords, final
List<DataRecord> nonBatchRecords) {
+ private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<PipelineSQLOperationType, Collection<DataRecord>> batchRecords, final
Collection<DataRecord> nonBatchRecords) {
return new GroupedDataRecord(tableName,
batchRecords.getOrDefault(PipelineSQLOperationType.INSERT,
Collections.emptyList()),
batchRecords.getOrDefault(PipelineSQLOperationType.UPDATE,
Collections.emptyList()),
batchRecords.getOrDefault(PipelineSQLOperationType.DELETE,
Collections.emptyList()),
nonBatchRecords);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 8fb6b63b3ea..fdec9682362 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -71,7 +71,7 @@ class MultiplexMemoryPipelineChannelTest {
CountDownLatch countDownLatch = new CountDownLatch(recordCount);
MultiplexMemoryPipelineChannel memoryChannel = new
MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
fetchWithMultiThreads(memoryChannel, countDownLatch);
- memoryChannel.pushRecords(Arrays.asList(records));
+ memoryChannel.push(Arrays.asList(records));
boolean awaitResult = countDownLatch.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult, "await failed");
memoryChannel.close();
@@ -86,7 +86,7 @@ class MultiplexMemoryPipelineChannelTest {
private void fetch(final MultiplexMemoryPipelineChannel memoryChannel,
final CountDownLatch countDownLatch) {
int maxLoopCount = 10;
for (int j = 1; j <= maxLoopCount; j++) {
- List<Record> records = memoryChannel.fetchRecords(100, 1,
TimeUnit.SECONDS);
+ List<Record> records = memoryChannel.fetch(100, 1,
TimeUnit.SECONDS);
memoryChannel.ack(records);
records.forEach(each -> countDownLatch.countDown());
if (!records.isEmpty() && records.get(records.size() - 1)
instanceof FinishedRecord) {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
index b316cd13eb2..cd5d988ce32 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -39,9 +39,9 @@ class SimpleMemoryPipelineChannelTest {
void assertZeroQueueSizeWorks() {
SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(0, new EmptyAckCallback());
List<Record> records = Collections.singletonList(new
PlaceholderRecord(new IngestFinishedPosition()));
- Thread thread = new Thread(() -> channel.pushRecords(records));
+ Thread thread = new Thread(() -> channel.push(records));
thread.start();
- assertThat(channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS),
is(records));
+ assertThat(channel.fetch(1, 500, TimeUnit.MILLISECONDS), is(records));
thread.join();
}
@@ -49,11 +49,11 @@ class SimpleMemoryPipelineChannelTest {
void assertFetchRecordsTimeoutCorrectly() {
SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
long startMillis = System.currentTimeMillis();
- channel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+ channel.fetch(1, 1, TimeUnit.MILLISECONDS);
long delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " +
delta);
startMillis = System.currentTimeMillis();
- channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS);
+ channel.fetch(1, 500, TimeUnit.MILLISECONDS);
delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : "
+ delta);
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 135aa115437..610bff78c44 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -122,7 +122,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
if (dataRecords.isEmpty()) {
return;
}
- channel.pushRecords(dataRecords);
+ channel.push(dataRecords);
}
private List<? extends Record> handleEvent(final AbstractBinlogEvent
event) {
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 0d8624dfb03..b556d51adcf 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -156,7 +156,7 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
records.add(walEventConverter.convert(each));
}
records.add(walEventConverter.convert(event));
- channel.pushRecords(records);
+ channel.push(records);
rowEvents = new LinkedList<>();
}
}
@@ -165,7 +165,7 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
if (event instanceof BeginTXEvent) {
return;
}
-
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+
channel.push(Collections.singletonList(walEventConverter.convert(event)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 148b15063e8..6ceede7a6fc 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -148,7 +148,7 @@ public final class PostgreSQLWALDumper extends
AbstractPipelineLifecycleRunnable
records.add(walEventConverter.convert(each));
}
records.add(walEventConverter.convert(event));
- channel.pushRecords(records);
+ channel.push(records);
}
}
@@ -156,7 +156,7 @@ public final class PostgreSQLWALDumper extends
AbstractPipelineLifecycleRunnable
if (event instanceof BeginTXEvent) {
return;
}
-
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+
channel.push(Collections.singletonList(walEventConverter.convert(event)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 4347206d22f..7bc8c1cda9d 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -133,6 +133,6 @@ class PostgreSQLWALDumperTest {
walDumper.start();
} catch (final IngestException ignored) {
}
- assertThat(channel.fetchRecords(100, 0, TimeUnit.SECONDS).size(),
is(1));
+ assertThat(channel.fetch(100, 0, TimeUnit.SECONDS).size(), is(1));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 70fb06e6d5c..790a8b8343c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -96,7 +96,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private void doWithoutSorting() {
for (final CDCChannelProgressPair channelProgressPair :
originalChannelProgressPairs) {
PipelineChannel channel = channelProgressPair.getChannel();
- List<Record> records = channel.fetchRecords(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
@@ -180,7 +180,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private void prepareWhenQueueIsEmpty(final List<CDCChannelProgressPair>
channelProgressPairs) {
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
- List<Record> records = channel.pollRecords();
+ List<Record> records = channel.poll();
if (records.isEmpty()) {
continue;
}
@@ -204,18 +204,18 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair>
channelProgressPairs, final long oldestCSN) {
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
- List<Record> records = channel.peekRecords();
+ List<Record> records = channel.peek();
if (records.isEmpty()) {
continue;
}
if (0 == getDataRecordsCount(records)) {
- records = channel.pollRecords();
+ records = channel.poll();
channel.ack(records);
continue;
}
long csn = findFirstDataRecord(records).getCsn();
if (csn <= oldestCSN) {
- records = channel.pollRecords();
+ records = channel.poll();
csnRecordsQueue.add(new CSNRecords(csn, each, records));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 75aebe0fe1f..55ef59c476a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -101,7 +101,7 @@ class PipelineDataSourceSinkTest {
void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord =
getDataRecord(PipelineSQLOperationType.INSERT);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(insertRecord));
+ when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(insertRecord));
importer.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
@@ -113,7 +113,7 @@ class PipelineDataSourceSinkTest {
void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord =
getDataRecord(PipelineSQLOperationType.DELETE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(deleteRecord));
+ when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(deleteRecord));
when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
importer.run();
verify(preparedStatement).setObject(1, 1);
@@ -125,7 +125,7 @@ class PipelineDataSourceSinkTest {
void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord =
getDataRecord(PipelineSQLOperationType.UPDATE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
+ when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
importer.run();
verify(preparedStatement).setObject(1, 20);
verify(preparedStatement).setObject(2,
PipelineSQLOperationType.UPDATE);
@@ -138,7 +138,7 @@ class PipelineDataSourceSinkTest {
void assertUpdatePrimaryKeyDataRecord() throws SQLException {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
- when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
+ when(channel.fetch(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
importer.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);