Hi Jonas,

For the previos versions, the checkpoint would be aborted as long
as any task get finished, no matter if they are from the save vertex.

And for the Kinesis, sorry I do not find an environment to do a test,
but if the task would indeed finished if there are no shards, I think
it would indeed be a problem for this case.

Best,
Yun



 ------------------Original Mail ------------------
Sender:jonas eyob <jonas.e...@gmail.com>
Send Date:Fri Nov 26 22:40:48 2021
Recipients:Yun Gao <yungao...@aliyun.com>
CC:user <user@flink.apache.org>
Subject:Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

Hi Yun, thanks for the quick reply!

Great to hear that a fix has been put in place as of Flink 1.14. 

Since we are currently using Beam on top of Flink, we are currently limited to 
the Flink 1.13 runner, so I would expect the Fix not to be available to us yet. 

But to clarify the underlying problem for me: is this caused by having tasks 
parallelism > 1, but only of them is RUNNING (other in FINISHED state)?
Would there be a problem if say, we have two tasks to consume events from a 
kinesis source but the stream has only 1 shard? 
Den fre 26 nov. 2021 kl 03:14 skrev Yun Gao <yungao...@aliyun.com>:

Hi Jonas,

Previously Flink indeed does not support checkpoints after some tasks finished. 
In 1.14 we implement a first version for this feature (namely 
https://issues.apache.org/jira/browse/FLINK-2491),
and it could be enabled by set 
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
We will also try to enable the flag by default in 1.15.

Best,
Yun



------------------------------------------------------------------
Sender:jonas eyob<jonas.e...@gmail.com>
Date:2021/11/26 01:53:17
Recipient:user<user@flink.apache.org>
Theme:Checkpoints aborted - Job is not in state RUNNING but FINISHED

Hi all,

I have been struggling with this issue for a couple of days now. Checkpointing 
appears to fail as the Task Source ( kinesis stream in this case) appears to be 
in a FINISHED state. 

Excerpt from Jobmanager logs:

2021-11-25 12:52:00,479 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2) 
(eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,494 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) 
(1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,569 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupByKey -> 
ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce 
state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (1/2) 
(1a77c7ed026ac4e4a59ab66876053102) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,582 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - GroupByKey -> 
ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce 
state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (2/2) 
(31588d4dad22821d7226ec65687d0edb) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,881 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) 
(1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED.
2021-11-25 12:52:06,528 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 
triggering task Source: Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job 
00000000000000000000000000000000 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.

For context, here is an excerpt from the flink-conf.yaml file:

flink-conf.yaml: |+
 # TaskManager configurations
 taskmanager.numberOfTaskSlots: 2
 taskmanager.rpc.port: 6122
 taskmanager.memory.process.size: 1728m

 # JobManager configurations
 jobmanager.rpc.address: {{ $fullName }}-jobmanager
 jobmanager.rpc.port: 6123
 jobmanager.memory.process.size: 1600m
 blob.server.port: 6124
 queryable-state.proxy.ports: 6125
 parallelism.default: 1 # default paralleism when not defined elsewhere
 kubernetes.namespace: {{ $fullName }} # The namespace that will be used for 
running the jobmanager and taskmanager pods.
 scheduler-mode: reactive
 # High-availability configurations
 high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 high-availability.storageDir: s3://company-flink-{{ .Values.environment 
}}/recovery
 hive.s3.use-instance-credentials: true
 kubernetes.cluster-id: {{ $fullName }}
 # Checkpoint and State backend
 state.backend: rocksdb
 state.checkpoint-storage: filesystem # jobmanager or filesystem
 state.backend.incremental: true # only supported by rocksdb
 state.checkpoints.dir: s3://company-flink-{{ .Values.environment }}/checkpoints
 execution.checkpointing.interval: 20 min
 execution.checkpointing.min-pause: 10 min # minimum time between checkpoints 
to reduce overhead
 state.checkpoints.num-retained: 1 # Maximum number of completed checkpoints to 
retain
 # Fault tolerance
 restart-strategy: fixed-delay
 restart-strategy.fixed-delay.delay: 10 s
 restart-strategy.fixed-delay.attempts: 3 # try n times before job is 
considered failed

From what I can see the job is still running, and the checkpointing keeps 
failing.
After finding this (https://issues.apache.org/jira/browse/FLINK-2491) I updated 
the default parallelism from 2 -> 1 since our current kinesis steam consists of 
1 shard. But problem persists. 

Any ideas?

Jonas


-- 
Med Vänliga Hälsningar
Jonas Eyob

Reply via email to