[ https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028 ]
Xin Gong commented on FLINK-35859: ---------------------------------- [~loserwang1024] Users cannot immediately perceive task issues. Maybe we can fix it to more perfect. I add a flag to trigger restart when status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized. {code:java} // code placeholder /** Assigner for snapshot split. */ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class); private boolean flagExceptionAssignerStatusWhenCheckpoint; private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() && AssignerStatus.isAssigningFinished(assignerStatus)) { ...... } else if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { flagExceptionAssignerStatusWhenCheckpoint = true; LOG.info("exceptionAssignerStatusCheckpointFlag to true"); } } @Override public void notifyCheckpointComplete(long checkpointId) { if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus) && flagExceptionAssignerStatusWhenCheckpoint) { throw new FlinkRuntimeException("Previous assigner status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and " + "newly add table will cause task always be exception from checkpoint, so we " + "trigger restart for newly table after assigner to normal status"); } } } {code} > [flink-cdc] Fix: The assigner is not ready to offer finished split > information, this should not be called > --------------------------------------------------------------------------------------------------------- > > Key: FLINK-35859 > URL: https://issues.apache.org/jira/browse/FLINK-35859 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.1 > Reporter: Hongshun Wang > Assignee: Hongshun Wang > Priority: Minor > Fix For: cdc-3.2.0 > > > When use CDC with newly added table, an error occurs: > {code:java} > The assigner is not ready to offer finished split information, this should > not be called. {code} > It's because: > 1. when stop then restart the job , the status is > NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. > > 2. Then Enumerator will send each reader with > BinlogSplitUpdateRequestEvent to update binlog. (see > org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders). > 3. The Reader will suspend binlog reader then send > BinlogSplitMetaRequestEvent to Enumerator. > 4. The Enumerator found that some tables are not sent, an error will occur > {code:java} > private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent > requestEvent) { > // initialize once > if (binlogSplitMeta == null) { > final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = > splitAssigner.getFinishedSplitInfos(); > if (finishedSnapshotSplitInfos.isEmpty()) { > LOG.error( > "The assigner offers empty finished split information, > this should not happen"); > throw new FlinkRuntimeException( > "The assigner offers empty finished split information, > this should not happen"); > } > binlogSplitMeta = > Lists.partition( > finishedSnapshotSplitInfos, > sourceConfig.getSplitMetaGroupSize()); > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)