Hi Tobi, State in batch mode works in a single-key-a-time fashion. That makes state to keep fast and small. However, although streaming and batch mode on streaming API are in general equivalent, batch mode covers only a subset of algorithm possible in streaming mode. Elaborate timers and watermarking strategies e.g. in batch mode need special attention or manual implementation. If I remember right, there is a future FLIP that will take care of this.
In your situation (you were not specific enough) you could use the snapshot data by means of the state API as if it were a number of batch sources and integrate into a new batch job. Thias From: tobias.schu...@xing.com <tobias.schu...@xing.com> Sent: Freitag, 6. August 2021 15:02 To: user@flink.apache.org; Chesnay Schepler <ches...@apache.org> Subject: Re: [EXT] Re: Reusing savepoints from a streaming job in a batch job Sorry to hear that. Did I got that right that there is no way to save and recover state in batch mode? Tobi On 6. Aug 2021, 13:20 +0200, Chesnay Schepler <ches...@apache.org<mailto:ches...@apache.org>>, wrote: All checkpointing-related features do not work in batch mode. https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/#important-considerations On 04/08/2021 21:23, tobias.schu...@xing.com<mailto:tobias.schu...@xing.com> wrote: He folks, This is a crosspost of a stack overflow question (https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job) which didn’t get any replies yet so please bare with me. Let me start in a generic fashion to see if I somehow missed some concepts: I have a streaming flink job from which I created a savepoint and try to reuse that save point in the same job running in batch-mode. Simplified version of this job looks like this Pseduo-Code: val flink = StreamExecutionEnvironment.getExecutionEnvironment val stream = if (batchMode) { flink.readFile(path) } else { flink.addKafkaSource(topicName) } stream.keyBy(key) stream.process(new ProcessorWithKeyedState()) CassandraSink.addSink(stream) This works fine as long as I run the job without a savepoint. If I start the job from a savepoint I get an exception which looks like this Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623) at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249) I could work around this if I set the option: execution.batch-state-backend.enabled: false but this eventually results in another error: Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673) at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653) at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526) Of course I tried to set the config key taskmanager.memory.managed.consumer-weights (used DATAPROC:70,PYTHON:30) but this doesn't seems to have any effects. So I wonder if I have a conceptual error and can't reuse savepoints from a streaming job in a batch job or if I simply have a problem in my configuration. Any hints? Thanks in advance, Tobi Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.