That all sounds very interesting; I'd go ahead with creating tickets.
On 08/03/2022 13:43, Schwalbe Matthias wrote:
Dear Flink Team,
In the last weeks I was faced with a large savepoint (around 40GiB)
that contained lots of obsolete data points and overwhelmed our
infrastructure (i.e. failed to load/restart).
We could not afford to lose the state, hence I spent the time to
transcode the savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint
API uneasy with larger savepoints, found simple solutions …
I would like to contribute my findings and ‘fixes’, however on my
corporate infrastructure I cannot fork/build Flink locally nor PR the
changes later on.
Before creating Jira tickets I wanted to quickly discuss the matter.
Findings:
* /(We are currently on Flink 1.13 (RocksDB state backend) but all
findings apply as well to the latest version)/
* WritableSavepoint.write(…) falls back to
JobManagerCheckpointStorage which restricts savepoint size to 5MiB
o See relevant exception stack here [1]
o This is because
SavepointTaskManagerRuntimeInfo.getConfiguration() always
returns empty Configuration, hence
o Neither “state.checkpoint-storage” nor “state.checkpoints.dir”
are/can be configured
o ‘fix’: provide
SavepointTaskManagerRuntimeInfo.getConfiguration() with a
meaningful implementation and set configuration in
SavepointEnvironment.getTaskManagerInfo()
* When loading a state, MultiStateKeyIterator load and bufferes the
whole state in memory before it event processes a single data point
o This is absolutely no problem for small state (hence the unit
tests work fine)
o MultiStateKeyIterator ctor sets up a java Stream that iterates
all state descriptors and flattens all datapoints contained within
o The java.util.stream.Stream#flatMap function causes the
buffering of the whole data set when enumerated later on
o See call stack [2]
+ I our case this is 150e6 data points (> 1GiB just for the
pointers to the data, let alone the data itself ~30GiB)
o I’m not aware of some instrumentation if Stream in order to
avoid the problem, hence
o I coded an alternative implementation of MultiStateKeyIterator
that avoids using java Stream,
o I can contribute our implementation
(MultiStateKeyIteratorNoStreams)
* I found out that, at least when using LocalFileSystem on a windows
system, read I/O to load a savepoint is unbuffered,
o See example stack [3]
o i.e. in order to load only a long in a serializer, it needs to
go into kernel mode 8 times and load the 8 bytes one by one
o I coded a BufferedFSDataInputStreamWrapper that allows to
opt-in buffered reads on any FileSystem implementation
o In our setting savepoint load is now 30 times faster
o I’ve once seen a Jira ticket as to improve savepoint load time
in general (lost the link unfortunately), maybe this approach
can help with it
o not sure if HDFS has got the same problem
o I can contribute my implementation
Looking forward to your comments
Matthias (Thias) Schwalbe
[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0]
ERROR BatchTask - Error in task code: MapPartition
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of
the state is larger than the maximum permitted memory-backed state.
Size=180075318 , maxSize=5242880 . Consider using a different state
backend, like the File System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
at
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
at
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Size of the state is larger than the
maximum permitted memory-backed state. Size=180075318 ,
maxSize=5242880 . Consider using a different state backend, like the
File System State backend.
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
at
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
at
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
at
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
... 14 more
[2] Streams call stack:
hasNext:77, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1
(java.util.stream) # <R>
Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<?
extends R>> var1)
accept:373, ReferencePipeline$11$1
(java.util.stream) #
Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)
# <R> Stream<R> map(final Function<? super
P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294,
StreamSpliterators$WrappingSpliterator (java.util.stream)
getAsBoolean:-1, 1528195520
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator
(java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator
(java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator(org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator
(org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat(org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
[3] unbuffered reads stack:
read:207, FileInputStream (java.io)
read:68, LocalDataInputStream (org.apache.flink.core.fs.local)
read:50, FSDataInputStreamWrapper (org.apache.flink.core.fs)
read:42, ForwardingInputStream (org.apache.flink.runtime.util)
readInt:390, DataInputStream (java.io)
deserialize:80, BytePrimitiveArraySerializer
(org.apache.flink.api.common.typeutils.base.array)
next:298, FullSnapshotRestoreOperation$KeyGroupEntriesIterator
(org.apache.flink.runtime.state.restore)
next:273, FullSnapshotRestoreOperation$KeyGroupEntriesIterator
(org.apache.flink.runtime.state.restore)
restoreKVStateData:147, RocksDBFullRestoreOperation
(org.apache.flink.contrib.streaming.state.restore)
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden
kann, übernehmen wir keine Haftung für die Gewährung der
Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher
Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um
Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist
streng verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of
email communication cannot be guaranteed, we do not accept any
responsibility for the confidentiality and the intactness of this
message. If you have received it in error, please advise the sender by
return e-mail and delete this message and any attachments. Any
unauthorised use or dissemination of this information is strictly
prohibited.