sherhomhuang created FLINK-38531:
------------------------------------

             Summary: MySQL CDC BinlogOffset compareTo method cannot work 
correctly when gtidSet is same
                 Key: FLINK-38531
                 URL: https://issues.apache.org/jira/browse/FLINK-38531
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
            Reporter: sherhomhuang


The code in method 
*org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo*  :
{code:java}

if (StringUtils.isNotEmpty(gtidSetStr)) {
                // Both have GTIDs, so base the comparison entirely on the GTID 
sets.
                GtidSet gtidSet = new GtidSet(gtidSetStr);
                GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
                if (gtidSet.equals(targetGtidSet)) {
                    long restartSkipEvents = this.getRestartSkipEvents();
                    long targetRestartSkipEvents = that.getRestartSkipEvents();
                    return Long.compare(restartSkipEvents, 
targetRestartSkipEvents);
                }
                // The GTIDs are not an exact match, so figure out if this is a 
subset of the target
                // offset
                // ...
                return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
 } {code}
 
When gtidSet is same, the method is returned at comparison is 
restartSkipEvents. But when restartSkipEvents is same , it should continue to 
compare restartSkipRows.
Look at the following code of 
*org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo*  
when missing gtidSet, it is correct to compare the restartSkipRows:

{code:java}
    // The positions are the same, so compare the completed events in the 
transaction ...
        if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
            return Long.compare(this.getRestartSkipEvents(), 
that.getRestartSkipEvents());
        }

        // The completed events are the same, so compare the row number ...
        return Long.compare(this.getRestartSkipRows(), 
that.getRestartSkipRows());
{code}


 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to