Hi Tobi,
State in batch mode works in a single-key-a-time fashion. That makes state to 
keep fast and small.
However, although streaming and batch mode on streaming API are in general 
equivalent, batch mode covers only a subset of algorithm possible in streaming 
mode.
Elaborate timers and watermarking strategies e.g. in batch mode need special 
attention or manual implementation.
If I remember right, there is a future FLIP that will take care of this.

In your situation (you were not specific enough) you could use the snapshot 
data by means of the state API as if it were a number of batch sources and 
integrate into a new batch job.

Thias


From: tobias.schu...@xing.com <tobias.schu...@xing.com>
Sent: Freitag, 6. August 2021 15:02
To: user@flink.apache.org; Chesnay Schepler <ches...@apache.org>
Subject: Re: [EXT] Re: Reusing savepoints from a streaming job in a batch job

Sorry to hear that. Did I got that right that there is no way to save and 
recover state in batch mode?

Tobi
On 6. Aug 2021, 13:20 +0200, Chesnay Schepler 
<ches...@apache.org<mailto:ches...@apache.org>>, wrote:

All checkpointing-related features do not work in batch mode.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/#important-considerations

On 04/08/2021 21:23, tobias.schu...@xing.com<mailto:tobias.schu...@xing.com> 
wrote:
He folks,

This is a crosspost of a stack overflow question 
(https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job)
 which didn’t get any replies yet so please bare with me.

Let me start in a generic fashion to see if I somehow missed some concepts: I 
have a streaming flink job from which I created a savepoint and try to reuse 
that save point in the same job running in batch-mode. Simplified version of 
this job looks like this
Pseduo-Code:

val flink = StreamExecutionEnvironment.getExecutionEnvironment

val stream = if (batchMode) {

  flink.readFile(path)

}

else {

  flink.addKafkaSource(topicName)

}



stream.keyBy(key)

stream.process(new ProcessorWithKeyedState())



CassandraSink.addSink(stream)
This works fine as long as I run the job without a savepoint. If I start the 
job from a savepoint I get an exception which looks like this

Caused by: java.lang.UnsupportedOperationException: Checkpoints are not 
supported in a single key state backend

    at 
org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)

    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)

    at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)

    at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)

    at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
I could work around this if I set the option:

execution.batch-state-backend.enabled: false
but this eventually results in another error:

Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
allocate should not be 0. Please make sure that all types of managed memory 
consumers contained in the job are configured with a non-negative weight via 
`taskmanager.memory.managed.consumer-weights`.

at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)

at 
org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)

at 
org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)

at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
Of course I tried to set the config key 
taskmanager.memory.managed.consumer-weights (used DATAPROC:70,PYTHON:30) but 
this doesn't seems to have any effects.
So I wonder if I have a conceptual error and can't reuse savepoints from a 
streaming job in a batch job or if I simply have a problem in my configuration. 
Any hints?

Thanks in advance,
Tobi


Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to