yuanfenghu created FLINK-38568:
----------------------------------
Summary: Performance bottleneck in BinlogSplitReader with large
number of snapshot splits
Key: FLINK-38568
URL: https://issues.apache.org/jira/browse/FLINK-38568
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: yuanfenghu
Fix For: cdc-3.6.0
h2. Background
When using MySQL CDC connector with large tables split into thousands of chunks
(e.g., 10,000+), the BinlogSplitReader.shouldEmit() method causes severe
performance degradation.Performance Impact (observed in production):
* CPU usage: 25.12% spent in splitKeyRangeContains()
* 38,403 comparisons per binlog record
* Algorithm: O(n) linear search through all finished snapshot splits
Root Causes:
* Linear search: For each binlog record, the code iterates through all
finished splits to find which split contains the record
* Unsorted list: The finished splits list is not sorted, preventing
optimization
h2. Solution
1. Sort splits by boundary in BinlogSplitReader.configureFilter():
{code:java}
splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); {code}
2. Replace linear search with binary search in BinlogSplitReader.shouldEmit():
{code:java}
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(...)) { return true; }}
FinishedSnapshotSplitInfo matchedSplit =
RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId),
chunkKey);return matchedSplit != null &&
position.isAfter(matchedSplit.getHighWatermark());{code}
h2. Performance Improvement
||Metric||Before||After||Improvement||
|Algorithm|O(n)|O(log n)|-|
|Comparisons|38,403|~16|2,400x|
|CPU usage|25.12%|<0.01%|2,500x|
|Time|34ms|<0.015ms|2,200x|
{code:java}
//代码占位符
`---ts=2025-10-24 18:09:07.334;thread_name=Source Data Fetcher for Source:
MySQL Source -> Parse -> Side Output -> Case-insensitive Convert
(1/4)#0;id=111;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@19a11164
`---[136.386382ms]
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:shouldEmit()
+---[0.00% 0.002533ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:isDataChangeRecord()
#248
+---[0.00% 0.002138ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getTableId() #249
+---[0.01% 0.010575ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getBinlogPosition()
#253
+---[0.04% 0.04841ms ]
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:hasEnterPureBinlogPhase()
#254
+---[0.00% 6.7E-4ms ]
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getDatabaseSchema()
#262
+---[0.00% 0.006651ms ]
io.debezium.connector.mysql.MySqlDatabaseSchema:tableFor() #262
+---[0.00% 7.69E-4ms ]
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
#263
+---[0.00% 5.72E-4ms ]
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:getChunkKeyColumns()
#263
+---[0.00% 6.17E-4ms ]
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
#264
+---[0.00% 4.89E-4ms ]
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:isTreatTinyInt1AsBoolean()
#264
+---[0.02% 0.02135ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils:getChunkKeyColumnType()
#261
+---[0.00% 0.002676ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getStructContainsChunkKey()
#266
+---[0.00% 6.1E-4ms ]
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSchemaNameAdjuster()
#269
+---[0.00% 0.003123ms ]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getSplitKey()
#268
+---[13.63% min=4.54E-4ms,max=0.007806ms,total=18.587447ms,count=38403]
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitStart()
#272
+---[13.80% min=4.55E-4ms,max=0.095948ms,total=18.823901ms,count=38403]
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitEnd()
#272
+---[25.12% min=7.74E-4ms,max=0.038913ms,total=34.265124ms,count=38403]
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:splitKeyRangeContains()
#271
+---[0.00% 9.15E-4ms ]
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getHighWatermark()
#273
`---[0.03% 0.042434ms ]
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset:isAfter() #273
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)