[ 
https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028
 ] 

Xin Gong commented on FLINK-35859:
----------------------------------

[~loserwang1024] Users cannot immediately perceive task issues. Maybe we can 
fix it to more perfect. I add a flag to trigger restart when status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized.
{code:java}
// code placeholder


/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);

    private boolean flagExceptionAssignerStatusWhenCheckpoint;

  
    private void captureNewlyAddedTables() {
        if (sourceConfig.isScanNewlyAddedTableEnabled() && 
AssignerStatus.isAssigningFinished(assignerStatus)) {
            ......
        } else if 
(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
            flagExceptionAssignerStatusWhenCheckpoint = true;
            LOG.info("exceptionAssignerStatusCheckpointFlag to true");
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
      
        if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus)
                && flagExceptionAssignerStatusWhenCheckpoint) {
            throw new FlinkRuntimeException("Previous assigner status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and "
                    + "newly add table will cause task always be exception from 
checkpoint, so we "
                    + "trigger restart for newly table after assigner to normal 
status");
        }
    }

}
 {code}

> [flink-cdc] Fix: The assigner is not ready to offer finished split 
> information, this should not be called
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35859
>                 URL: https://issues.apache.org/jira/browse/FLINK-35859
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>            Reporter: Hongshun Wang
>            Assignee: Hongshun Wang
>            Priority: Minor
>             Fix For: cdc-3.2.0
>
>
> When use CDC with newly added table,  an error occurs: 
> {code:java}
> The assigner is not ready to offer finished split information, this should 
> not be called. {code}
> It's because:
> 1. when stop then restart the job , the status is 
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED.
>  
> 2. Then Enumerator will send each reader with 
> BinlogSplitUpdateRequestEvent to update binlog. (see 
> org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders).
> 3. The Reader will suspend binlog reader then send 
> BinlogSplitMetaRequestEvent to Enumerator.
> 4. The Enumerator found that some tables are not sent, an error will occur
> {code:java}
> private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent 
> requestEvent) {
>     // initialize once
>     if (binlogSplitMeta == null) {
>         final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos =
>                 splitAssigner.getFinishedSplitInfos();
>         if (finishedSnapshotSplitInfos.isEmpty()) {
>             LOG.error(
>                     "The assigner offers empty finished split information, 
> this should not happen");
>             throw new FlinkRuntimeException(
>                     "The assigner offers empty finished split information, 
> this should not happen");
>         }
>         binlogSplitMeta =
>                 Lists.partition(
>                         finishedSnapshotSplitInfos, 
> sourceConfig.getSplitMetaGroupSize());
>    } 
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to