Dear Flink Team,
In the last weeks I was faced with a large savepoint (around 40GiB) that
contained lots of obsolete data points and overwhelmed our infrastructure (i.e.
failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the
savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API
uneasy with larger savepoints, found simple solutions ...
I would like to contribute my findings and 'fixes', however on my corporate
infrastructure I cannot fork/build Flink locally nor PR the changes later on.
Before creating Jira tickets I wanted to quickly discuss the matter.
Findings:
* (We are currently on Flink 1.13 (RocksDB state backend) but all findings
apply as well to the latest version)
* WritableSavepoint.write(...) falls back to JobManagerCheckpointStorage
which restricts savepoint size to 5MiB
* See relevant exception stack here [1]
* This is because SavepointTaskManagerRuntimeInfo.getConfiguration()
always returns empty Configuration, hence
* Neither "state.checkpoint-storage" nor "state.checkpoints.dir" are/can
be configured
* 'fix': provide SavepointTaskManagerRuntimeInfo.getConfiguration() with
a meaningful implementation and set configuration in
SavepointEnvironment.getTaskManagerInfo()
* When loading a state, MultiStateKeyIterator load and bufferes the whole
state in memory before it event processes a single data point
* This is absolutely no problem for small state (hence the unit tests
work fine)
* MultiStateKeyIterator ctor sets up a java Stream that iterates all
state descriptors and flattens all datapoints contained within
* The java.util.stream.Stream#flatMap function causes the buffering of
the whole data set when enumerated later on
* See call stack [2]
* I our case this is 150e6 data points (> 1GiB just for the pointers
to the data, let alone the data itself ~30GiB)
* I'm not aware of some instrumentation if Stream in order to avoid the
problem, hence
* I coded an alternative implementation of MultiStateKeyIterator that
avoids using java Stream,
* I can contribute our implementation (MultiStateKeyIteratorNoStreams)
* I found out that, at least when using LocalFileSystem on a windows
system, read I/O to load a savepoint is unbuffered,
* See example stack [3]
* i.e. in order to load only a long in a serializer, it needs to go into
kernel mode 8 times and load the 8 bytes one by one
* I coded a BufferedFSDataInputStreamWrapper that allows to opt-in
buffered reads on any FileSystem implementation
* In our setting savepoint load is now 30 times faster
* I've once seen a Jira ticket as to improve savepoint load time in
general (lost the link unfortunately), maybe this approach can help with it
* not sure if HDFS has got the same problem
* I can contribute my implementation
Looking forward to your comments
Matthias (Thias) Schwalbe
[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR
BatchTask - Error in task code: MapPartition
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state
is larger than the maximum permitted memory-backed state. Size=180075318 ,
maxSize=5242880 . Consider using a different state backend, like the File
System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
at
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
at
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Size of the state is larger than the maximum
permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider
using a different state backend, like the File System State backend.
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
at
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
at
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
... 14 more
[2] Streams call stack:
hasNext:77, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1 (java.util.stream)
# <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends
Stream<? extends R>> var1)
accept:373, ReferencePipeline$11$1 (java.util.stream)
# Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)
# <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator
(java.util.stream)
getAsBoolean:-1, 1528195520
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator
(java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator
(org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
[3] unbuffered reads stack:
read:207, FileInputStream (java.io)
read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
read:42, ForwardingInputStream (org.apache.flink.runtime.util)
readInt:390, DataInputStream (java.io)
deserialize:80, BytePrimitiveArraySerializer
(org.apache.flink.api.common.typeutils.base.array)
next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator
(org.apache.flink.runtime.state.restore)
next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator
(org.apache.flink.runtime.state.restore)
restoreKVStateData:147, RocksDBFullRestoreOperation
(org.apache.flink.contrib.streaming.state.restore)
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.