fanrui created FLINK-19911:
------------------------------
Summary: Read checkpoint stream with buffer to speedup restore
Key: FLINK-19911
URL: https://issues.apache.org/jira/browse/FLINK-19911
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.12.0, 1.11.3, 1.13.0
Environment: Flink version: 1.10
StateBackend : FsStateBackend
code: Flink SQL count(distinct userId)
uv: 10 million
State size: 200M
TM total memory: 16G
Parallelism: 1
Reporter: fanrui
Heap StateBackend needs to serialize each Java Object into the file system
during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to read
kvs from RocksDB and write them to the file system in the snapshot.
The above two cases involve a lot of small io, not large io, frequent small io
is not friendly to disk. Therefore, the buffer is used in the checkpoint
snapshot writing process of the file system. For details, refer to the buffer
of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
There will be many small IOs in the restore process, but restore does not have
a buffer. So I added a buffer and tested it based on Flink job.
h2. Flink Job environment:
Flink version: 1.10
StateBackend : FsStateBackend
code: Flink SQL count(distinct userId)
uv: 10 million
State size: 200M
TM total memory: 16G
Parallelism: 1
It takes 33.1s to restore without read buffer, and 12.8s to restore with read
buffer.
h2. How to do it?
Use FSDataBufferedInputStream to wrap fsDataInputStream in
HeapRestoreOperation#restore,code:
{code:java}
FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
FSDataInputStream bufferedInputStream = new
FSDataBufferedInputStream(fsDataInputStream);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)