[
https://issues.apache.org/jira/browse/FLINK-38184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yanquan Lv resolved FLINK-38184.
--------------------------------
Assignee: Hongshun Wang
Resolution: Fixed
Fixed in master via 4c479fddd6ec95279e7d2b00ae77a7bd3ecb654d.
> CDC no need to getCopyOfBuffer for each split info.
> ---------------------------------------------------
>
> Key: FLINK-38184
> URL: https://issues.apache.org/jira/browse/FLINK-38184
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.4.0
> Reporter: Hongshun Wang
> Assignee: Hongshun Wang
> Priority: Critical
> Labels: pull-request-available
> Fix For: cdc-3.5.0
>
> Attachments: image-2025-08-04-11-03-18-074.png
>
>
> When I have a big Postgres source table(1 billion data), then the checkpoint
> will cost multiple minutes which will block the whole reading. The main cost
> is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer.
> !image-2025-08-04-11-03-18-074.png!
> In cdc framework, SplitSerializer will invoke
> DataOutputSerializer#getCopyOfBuffer for each finished split info
> {code:java}
> //
> org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo
> private void writeFinishedSplitsInfo(
> List<FinishedSnapshotSplitInfo> finishedSplitsInfo,
> DataOutputSerializer out)
> throws IOException {
> final int size = finishedSplitsInfo.size();
> out.writeInt(size);
> for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
> splitInfo.serialize(out);
> }
> }
> //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer)
> public byte[] serialize(final DataOutputSerializer out) throws IOException {
> out.writeUTF(this.getTableId().toString());
> out.writeUTF(this.getSplitId());
> out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart()));
> out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd()));
> out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory));
> writeOffsetPosition(this.getHighWatermark(), out);
> boolean useCatalogBeforeSchema =
> SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId());
> out.writeBoolean(useCatalogBeforeSchema);
> return out.getCopyOfBuffer();
> } {code}
> However, it's different in mysql cdc
>
> {code:java}
> //
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo
> private static void writeFinishedSplitsInfo(
> List<FinishedSnapshotSplitInfo> finishedSplitsInfo,
> DataOutputSerializer out)
> throws IOException {
> final int size = finishedSplitsInfo.size();
> out.writeInt(size);
> for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
> out.writeUTF(splitInfo.getTableId().toString());
> out.writeUTF(splitInfo.getSplitId());
> out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
> out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
> writeBinlogPosition(splitInfo.getHighWatermark(), out);
> }
> } {code}
>
>
> To be honest, it's a redundant operation. For one split, it only need one
> time of out.getCopyOfBuffer rather than multiple times.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)