This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6545e4f9e [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141) 6545e4f9e is described below commit 6545e4f9e474a776b8178604c4048aa661793fff Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Wed Jan 4 17:24:03 2023 +0800 [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141) Co-authored-by: stingpeng <stingp...@tencent.com> --- .../assigners/MySqlSnapshotSplitAssigner.java | 24 +++++++++++++--------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 498edec76..d270fa792 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { executor.submit( () -> { - Iterator<TableId> iterator = remainingTables.iterator(); - while (iterator.hasNext()) { - TableId nextTable = iterator.next(); - // split the given table into chunks (snapshot splits) - Collection<MySqlSnapshotSplit> splits = - chunkSplitter.generateSplits(nextTable); - synchronized (lock) { - remainingSplits.addAll(splits); - remainingTables.remove(nextTable); - lock.notify(); + try { + Iterator<TableId> iterator = remainingTables.iterator(); + while (iterator.hasNext()) { + TableId nextTable = iterator.next(); + // split the given table into chunks (snapshot splits) + Collection<MySqlSnapshotSplit> splits = + chunkSplitter.generateSplits(nextTable); + synchronized (lock) { + remainingSplits.addAll(splits); + remainingTables.remove(nextTable); + lock.notify(); + } } + } catch (Exception e) { + LOG.error("asynchronously split exit with exception", e); } }); }