Ziyan Lee created FLINK-39749:
---------------------------------
Summary: [mysql-cdc] Introduce
scan.incremental.snapshot.string-key.compare-mode to align Java and MySQL
ordering for string chunk keys
Key: FLINK-39749
URL: https://issues.apache.org/jira/browse/FLINK-39749
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.6.0
Environment: MySQL 8.0.x, table with CHAR(36) PK, collation
utf8mb4_general_ci
Flink CDC 3.6.0
Pipeline job: mysql-cdc -> paimon
Reporter: Ziyan Lee
Fix For: cdc-3.7.0
Problem Statement
-----------------
When using a string-typed column (CHAR / VARCHAR) as the chunk split key in
MySQL CDC incremental snapshot, chunk boundaries computed by Flink CDC may
diverge from the actual row order in MySQL if the table uses a case-insensitive
collation (e.g., utf8mb4_general_ci, utf8mb4_unicode_ci).
This happens because:
1. SQL query layer: MySQL evaluates WHERE predicates and ORDER BY using the
column's collation.
- utf8mb4_general_ci: 'A' == 'a' (case-insensitive)
2. Java logic layer: Flink CDC uses String.compareTo() to determine chunk
boundaries and route binlog events to splits.
- Java default: 'A' (65) < 'a' (97) (case-sensitive, Unicode code-point
order)
When these two ordering rules differ, the following symptoms occur:
- Chunk splitting terminates prematurely. The last chunk becomes unbounded
(splitEnd == null), causing a single huge chunk that reads most or all of the
table.
- During backfill / binlog phase, a record may be assigned to the wrong split
because Java's boundary check does not match MySQL's actual data range.
- In the worst case, this leads to duplicate processing or lost events
downstream.
Concrete example
----------------
Table: orders (id CHAR(36) PRIMARY KEY) with utf8mb4_general_ci
Data: mixed-case UUIDs such as '9f...', 'A1...', 'a2...'
MySQL ORDER BY id (utf8mb4_general_ci) treats 'A1...' and 'a1...' as equal.
Java String.compareTo() orders them as: '9f...' < 'A1...' < 'a1...' < 'a2...'
If chunkSize = 8096 and MySQL returns 'A1...' as a boundary, Java may later
fail to find the next chunk maximum because its internal ordering sees 'A1...'
< 'a1...', while MySQL sees them as the same group. queryNextChunkMax
eventually returns null, and the remaining data falls into an unbounded chunk.
Proposed Solution
-----------------
Introduce a new configuration option:
scan.incremental.snapshot.string-key.compare-mode
with three modes:
| Mode | Java comparison | SQL behavior
| Recommended for |
|--------------------|------------------------|-------------------------------------------|-----------------------------------------|
| default | String.compareTo() | Standard predicates (no
BINARY) | Binary collations (utf8mb4_bin) |
| case-insensitive | compareToIgnoreCase() | Standard predicates (no
BINARY) | Case-insensitive collations (utf8mb4_general_ci,
utf8mb4_unicode_ci) |
| binary | String.compareTo() | BINARY col <= BINARY ?
| Force byte-level exact match |
Key implementation points:
- Add ChunkKeyCompareMode enum (DEFAULT, CASE_INSENSITIVE, BINARY).
- Propagate the mode through all three API layers: DataStream API
(MySqlSourceBuilder), Flink SQL (MySqlTableSourceFactory), and Pipeline YAML
(MySqlDataSourceFactory).
- Update StatementUtils to generate BINARY ... BINARY predicates when mode ==
BINARY.
- Update ObjectUtils, SplitKeyUtils, and RecordUtils to perform comparisons
using the selected mode.
- Ensure snapshot splitting (MySqlChunkSplitter), snapshot reading
(SnapshotSplitReader / MySqlSnapshotSplitReadTask), and binlog reading
(BinlogSplitReader) all use the SAME mode consistently.
Scope & Compatibility
---------------------
- Default value is "default", preserving backward compatibility.
- Changing this option requires a fresh checkpoint restart because split
boundaries are persisted in snapshot state.
- CASE_INSENSITIVE is safe for pure ASCII keys (e.g., UUIDs). For non-ASCII
characters, Java's compareToIgnoreCase() and MySQL's collation folding tables
are not strictly equivalent; BINARY mode is recommended in those cases.
Documentation & Tests
---------------------
- English and Chinese docs updated for both mysql-cdc source connector and
pipeline connector pages.
- MySqlTableSourceFactoryTest updated to cover the new parameter.
---
--
This message was sent by Atlassian Jira
(v8.20.10#820010)