I was a bit confused about when you said that the "source is done" which
is when I realized you must be using the batch API for which
checkpointing is not available / needed. Let me quote from [1] which
imho has not changed:

DataSet:

Fault tolerance for the DataSet API works by restarting the job and
redoing all of the work. [...] The periodic in-flight checkpoints are
not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming
job), and draw periodic checkpoints that make sure replay-on-failure
only has to redo only a bit, not everything.


Nico

[1]
https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E

On 26/02/18 22:55, Ken Krugler wrote:
> Hi Nico,
> 
>> On Feb 26, 2018, at 9:41 AM, Nico Kruber <n...@data-artisans.com
>> <mailto:n...@data-artisans.com>> wrote:
>>
>> Hi Ken,
>> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
>> was attempting to even create one but could not finish. Maybe your
>> program was not fully running yet?
> 
> In the logs I see:
> 
> 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
> 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> urls source (1/2).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> triggering task Source: Seed urls source (1/2) is not being executed at
> the moment. Aborting checkpoint.
> 
> Maybe the checkpoint here is happening too soon after the “Initializing
> Source” message.
> 
> After that the source is done (it only triggers the iteration with a
> single starting tuple), so I wouldn’t expect checkpointing to actually
> do anything. I was just using these messages as indications that I had
> configured my workflow properly to actually do checkpointing.
> 
>> Can you tell us a little bit more about your set up and how you
>> configured the LocalFlinkMiniCluster?
> 
> Potential issue #1 - I’ve got a workflow with multiple iterations.
> 
> For that reason I had to force checkpointing via:
> 
>         env.setStateBackend(new MemoryStateBackend());
> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
> 
> 
> Potential issue #2 - because of the fun with tracking iteration
> progress, I subclass LocalStreamEnvironment to add this async execution
> method:
> 
> public JobSubmissionResult executeAsync(String jobName) throws Exception {
> // transform the streaming program into a JobGraph
> StreamGraph streamGraph = getStreamGraph();
> streamGraph.setJobName(jobName);
> 
> JobGraph jobGraph = streamGraph.getJobGraph();
> 
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
> 
> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> jobGraph.getMaximumParallelism());
> 
> // add (and override) the settings with what the user defined
> configuration.addAll(_conf);
> 
> _exec = new LocalFlinkMiniCluster(configuration, true);
> _exec.start(true);
> 
> 
> // The above code is all basically the same as Flink's
> LocalStreamEnvironment.
> // The change is that here we call submitJobDetached vs. submitJobAndWait.
> // We assume that eventually someone calls stop(job id), which then
> terminates
> // the LocalFlinkMinimCluster.
> return _exec.submitJobDetached(jobGraph);
> }
> 
> However I don’t think that would impact checkpointing.
> 
> Anything else I should do to debug whether checkpointing is operating as
> expected? In the logs, at DEBUG level, I don’t see any errors or
> warnings related to this.
> 
> Thanks,
> 
> — Ken
> 
>>
>>
>> Nico
>>
>> On 23/02/18 21:42, Ken Krugler wrote:
>>> Hi all,
>>>
>>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>>>
>>> Asking because I’m not seeing checkpoint calls being made to my
>>> custom function (implements ListCheckpointed) when I’m running with
>>> LocalFlinkMiniCluster.
>>>
>>> Though I do see entries like this logged:
>>>
>>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
>>> application-defined state backend for checkpoint/savepoint metadata:
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
>>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
>>> Checkpoint triggering task Source: Seed urls source (1/2) is not
>>> being executed at the moment. Aborting checkpoint.
>>>
>>> But when I browse the Flink source, tests for checkpointing seem to
>>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> --------------------------------------------
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
> 
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to