[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361776#comment-15361776
 ] 

ASF GitHub Bot commented on FLINK-3190:
---------------------------------------

Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69491638
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ---
    @@ -174,71 +140,54 @@ private void validateConstraints(ExecutionGraph eg) {
     
        @Test
        public void testRestartAutomatically() throws Exception {
    -           Instance instance = ExecutionGraphTestUtils.getInstance(
    -                           new 
SimpleActorGateway(TestingUtils.directExecutionContext()),
    -                           NUM_TASKS);
    +           RestartStrategy restartStrategy = new 
FixedDelayRestartStrategy(1, 1000);
    +           Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
    +           ExecutionGraph eg = executionGraphInstanceTuple.f0;
     
    -           Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
    -           scheduler.newInstanceAvailable(instance);
    -
    -           JobVertex sender = new JobVertex("Task");
    -           sender.setInvokableClass(Tasks.NoOpInvokable.class);
    -           sender.setParallelism(NUM_TASKS);
    -
    -           JobGraph jobGraph = new JobGraph("Pointwise job", sender);
    -
    -           ExecutionGraph eg = new ExecutionGraph(
    -                           TestingUtils.defaultExecutionContext(),
    -                           new JobID(),
    -                           "Test job",
    -                           new Configuration(),
    -                           ExecutionConfigTest.getSerializedConfig(),
    -                           AkkaUtils.getDefaultTimeout(),
    -                           new FixedDelayRestartStrategy(1, 1000));
    -           
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
    +           restartAfterFailure(eg, new FiniteDuration(2, 
TimeUnit.MINUTES), true);
    +   }
     
    -           assertEquals(JobStatus.CREATED, eg.getState());
    +   @Test
    +   public void taskShouldFailWhenFailureRateLimitExceeded() throws 
Exception {
    +           FailureRateRestartStrategy restartStrategy = new 
FailureRateRestartStrategy(2, TimeUnit.SECONDS, 0);
    +           FiniteDuration timeout = new FiniteDuration(50, 
TimeUnit.MILLISECONDS);
    +           Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = 
createExecutionGraph(restartStrategy);
    +           ExecutionGraph eg = executionGraphInstanceTuple.f0;
    +
    +           restartAfterFailure(eg, timeout, false);
    +           restartAfterFailure(eg, timeout, false);
    +           //failure rate limit not exceeded yet, so task is running
    +           assertEquals(JobStatus.RUNNING, eg.getState());
    +           Thread.sleep(1000); //wait for a second to restart limit rate
     
    -           eg.scheduleForExecution(scheduler);
    +           restartAfterFailure(eg, timeout, false);
    +           restartAfterFailure(eg, timeout, false);
    +           makeAFailureAndWait(eg, timeout);
    --- End diff --
    
    I deleted first half of the test (it's already tested in test below) and I 
increased interval to 2 seconds


> Retry rate limits for DataStream API
> ------------------------------------
>
>                 Key: FLINK-3190
>                 URL: https://issues.apache.org/jira/browse/FLINK-3190
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Michał Fijołek
>            Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to