Hi Edward,
Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code
it requires a path parameter and the path would be the state.checkpoint.dir. If
via flink-conf.yaml, I tried on 1.12 by setting state.backend: filesystem in
config file and enable checkpoint, it indeed threw an exception said
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Cannot create the file system state backend: The configuration
does not specify the checkpoint directory 'state.checkpoints.dir'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot
create the file system state backend: The configuration does not specify the
checkpoint directory 'state.checkpoints.dir'
at
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
at
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:237)
at
org.apache.flink.client.program.StreamContextEnvironment.<init>(StreamContextEnvironment.java:67)
at
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
at java.util.Optional.map(Optional.java:215)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
at CheckpointTest.main(CheckpointTest.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
For the timeout, if there are no backpressure, I think it might be helpful
to see the time decompostion for the checkpoint in the checkpoint history page
in WEB UI to see which phase takes too long time.
Best,
Yun
------------------Original Mail ------------------
Sender:Colletta, Edward <[email protected]>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao <[email protected]>, [email protected]
<[email protected]>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.
From: Yun Gao <[email protected]>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward <[email protected]>; [email protected]
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and
attachments.
Hi Edward,
For the second issue, have you also set the statebackend type? I'm asking
so because except for the default heap statebackend, other statebackends should
throws exception if the state.checkpoint.dir is not set. Since heap
statebackend stores all the snapshots in the JM's memory, it could not be
recovered after JM failover, which makes it not suitable for production usage.
Therefore, if used in production env then it might better to switch to
statebackend like rocksdb.
For the checkpoint timeout, AFAIK there should be no large changes after
1.9.2. There may be different issues for checkpoint timeout, and one possible
one might be there are back-pressure due to some operator could not process its
records in time, which would block the checkpoints. I think you might check
the back-pressure [1] first, and if there is indeed back pressure, then you
might try unaligned checkpoints or solve the back pressure by increasing the
parallelism of slow operators.
Best,
Yun
[1]https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html
------------------Original Mail ------------------
Sender:Colletta, Edward <[email protected]>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:[email protected] <[email protected]>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots
is set to true. 13 jobs running. Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.
We are seeing very high checkpoint times and experiencing timeouts. The
checkpoint timeout is the default 10 minutes. This does not seem to be
related to EFS limits/throttling . We started experiencing these timeouts
after upgrading from Flink 1.9.2/Java 8. Are there any known issues which
cause very high checkpoint times?
Also I noticed we did not set state.checkpoints.dir, I assume it is
usinghigh-availability.storageDir. Is that correct?
For now we plan on setting
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned
trueand also explicitly setstate.checkpoints.dir