I was a bit confused about when you said that the "source is done" which is when I realized you must be using the batch API for which checkpointing is not available / needed. Let me quote from [1] which imho has not changed:
DataSet: Fault tolerance for the DataSet API works by restarting the job and redoing all of the work. [...] The periodic in-flight checkpoints are not used here. DataStream: This one would start immediately inserting data (as it is a streaming job), and draw periodic checkpoints that make sure replay-on-failure only has to redo only a bit, not everything. Nico [1] https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E On 26/02/18 22:55, Ken Krugler wrote: > Hi Nico, > >> On Feb 26, 2018, at 9:41 AM, Nico Kruber <n...@data-artisans.com >> <mailto:n...@data-artisans.com>> wrote: >> >> Hi Ken, >> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it >> was attempting to even create one but could not finish. Maybe your >> program was not fully running yet? > > In the logs I see: > > 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source > (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING. > 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed > urls source (1/2). > 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint > triggering task Source: Seed urls source (1/2) is not being executed at > the moment. Aborting checkpoint. > > Maybe the checkpoint here is happening too soon after the “Initializing > Source” message. > > After that the source is done (it only triggers the iteration with a > single starting tuple), so I wouldn’t expect checkpointing to actually > do anything. I was just using these messages as indications that I had > configured my workflow properly to actually do checkpointing. > >> Can you tell us a little bit more about your set up and how you >> configured the LocalFlinkMiniCluster? > > Potential issue #1 - I’ve got a workflow with multiple iterations. > > For that reason I had to force checkpointing via: > > env.setStateBackend(new MemoryStateBackend()); > env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true); > > > Potential issue #2 - because of the fun with tracking iteration > progress, I subclass LocalStreamEnvironment to add this async execution > method: > > public JobSubmissionResult executeAsync(String jobName) throws Exception { > // transform the streaming program into a JobGraph > StreamGraph streamGraph = getStreamGraph(); > streamGraph.setJobName(jobName); > > JobGraph jobGraph = streamGraph.getJobGraph(); > > Configuration configuration = new Configuration(); > configuration.addAll(jobGraph.getJobConfiguration()); > > configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); > configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, > jobGraph.getMaximumParallelism()); > > // add (and override) the settings with what the user defined > configuration.addAll(_conf); > > _exec = new LocalFlinkMiniCluster(configuration, true); > _exec.start(true); > > > // The above code is all basically the same as Flink's > LocalStreamEnvironment. > // The change is that here we call submitJobDetached vs. submitJobAndWait. > // We assume that eventually someone calls stop(job id), which then > terminates > // the LocalFlinkMinimCluster. > return _exec.submitJobDetached(jobGraph); > } > > However I don’t think that would impact checkpointing. > > Anything else I should do to debug whether checkpointing is operating as > expected? In the logs, at DEBUG level, I don’t see any errors or > warnings related to this. > > Thanks, > > — Ken > >> >> >> Nico >> >> On 23/02/18 21:42, Ken Krugler wrote: >>> Hi all, >>> >>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster? >>> >>> Asking because I’m not seeing checkpoint calls being made to my >>> custom function (implements ListCheckpointed) when I’m running with >>> LocalFlinkMiniCluster. >>> >>> Though I do see entries like this logged: >>> >>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using >>> application-defined state backend for checkpoint/savepoint metadata: >>> MemoryStateBackend (data in heap memory / checkpoints to JobManager). >>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - >>> Checkpoint triggering task Source: Seed urls source (1/2) is not >>> being executed at the moment. Aborting checkpoint. >>> >>> But when I browse the Flink source, tests for checkpointing seem to >>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase >>> >>> Thanks, >>> >>> — Ken >>> >>> -------------------------------------------- >>> http://about.me/kkrugler >>> +1 530-210-6378 >>> >> > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 >
signature.asc
Description: OpenPGP digital signature