He folks,

This is a crosspost of a stack overflow question 
(https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job)
 which didn’t get any replies yet so please bare with me.

Let me start in a generic fashion to see if I somehow missed some concepts: I 
have a streaming flink job from which I created a savepoint and try to reuse 
that save point in the same job running in batch-mode. Simplified version of 
this job looks like this
Pseduo-Code:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
 flink.readFile(path)
}
else {
 flink.addKafkaSource(topicName)
}

stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())

CassandraSink.addSink(stream)
This works fine as long as I run the job without a savepoint. If I start the 
job from a savepoint I get an exception which looks like this
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not 
supported in a single key state backend
   at 
org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
   at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
   at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
   at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
   at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
I could work around this if I set the option:
execution.batch-state-backend.enabled: false
but this eventually results in another error:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
allocate should not be 0. Please make sure that all types of managed memory 
consumers contained in the job are configured with a non-negative weight via 
`taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at 
org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at 
org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
Of course I tried to set the config key 
taskmanager.memory.managed.consumer-weights (used DATAPROC:70,PYTHON:30) but 
this doesn't seems to have any effects.
So I wonder if I have a conceptual error and can't reuse savepoints from a 
streaming job in a batch job or if I simply have a problem in my configuration. 
Any hints?

Thanks in advance,
Tobi

Reply via email to