[ 
https://issues.apache.org/jira/browse/FLINK-26586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769942#comment-17769942
 ] 

Hangxiang Yu commented on FLINK-26586:
--------------------------------------

{quote} * it can not be configured (enabling/buffer size){quote}
IMO, enabling this could be default for whom uses it, at least I haven't seen 
much cost currently.

But it's better if we could support to configure the buffer size.
{quote} * I replace the 'file:' plugin with my wrapped one, which for a general 
solution is not very elegant{quote}
I think it could be a general solution.
We could implement a buffered one, and then replace some necessary places with 
the new one.

I'm not clear about your 'wrapped one' means, so maybe just link your pr here 
if you think it could work now :)
{quote} * I simply allocate the buffer from heap instead of integrating Flink 
buffer management{quote}
It's okay for me at least just for this case (restore procedure), I think we 
could just use heap firstly.
{quote}my implementation is a façade to potentially all filesystem 
implementations, I think only the local filesystem implementation needs it, so 
we could also map to the Java buffered local I/O implementation instead of 
using java.io.FileInputStream
{quote}
Some filesystems such as hdfs, s3 should also has its inner buffer 
implementation IIUC.
I am not sure whether just mapping to Java buffered stream could meet the 
requirement. [~fanrui] Could you share something about this ? I saw you have 
implemented a new buffered one before.

 

> FileSystem uses unbuffered read I/O
> -----------------------------------
>
>                 Key: FLINK-26586
>                 URL: https://issues.apache.org/jira/browse/FLINK-26586
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor, Connectors / FileSystem, Runtime 
> / Checkpointing
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>         Attachments: BufferedFSDataInputStreamWrapper.java, 
> BufferedLocalFileSystem.java
>
>
> - I found out that, at least when using LocalFileSystem on a windows system, 
> read I/O to load a savepoint is unbuffered,
>  - See example stack [1]
>  - i.e. in order to load only a long in a serializer, it needs to go into 
> kernel mode 8 times and load the 8 bytes one by one
>  - I coded a BufferedFSDataInputStreamWrapper that allows to opt-in buffered 
> reads on any FileSystem implementation
>  - In our setting savepoint load is now 30 times faster
>  - I’ve once seen a Jira ticket as to improve savepoint load time in general 
> (lost the link unfortunately), maybe this approach can help with it
>  - not sure if HDFS has got the same problem
>  - I can contribute my implementation of a BufferedFSDataInputStreamWrapper 
> which can be integrated in any 
> [1] unbuffered reads stack:
> read:207, FileInputStream (java.io)
> read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
> read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
> read:42, ForwardingInputStream (org.apache.flink.runtime.util)
> readInt:390, DataInputStream (java.io)
> deserialize:80, BytePrimitiveArraySerializer 
> (org.apache.flink.api.common.typeutils.base.array)
> next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator 
> (org.apache.flink.runtime.state.restore)
> restoreKVStateData:147, RocksDBFullRestoreOperation 
> (org.apache.flink.contrib.streaming.state.restore)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to