[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14620608#comment-14620608
 ] 

ASF GitHub Bot commented on FLINK-2008:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/895#discussion_r34261966
  
    --- Diff: docs/apis/streaming_guide.md ---
    @@ -1300,6 +1300,9 @@ Another way of exposing user defined operator state 
for the Flink runtime for ch
     
     When the user defined function implements the `Checkpointed` interface, 
the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw 
and restore function state.
     
    +In addition to that, user functions can also implement the 
`CheckpointNotifier` interface to receive notifications on completed 
checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
    +Note that there is no guarantee for the user function to receive a 
notification once the checkpoint is complete.
    --- End diff --
    
    Let's write it like "Note that there is no guarantee for the user function 
to receive a notification if a failure happens between checkpoint completion 
and notification. The notifications should hence be treated in a way that 
notifications from later checkpoints can subsume missing notifications."


> PersistentKafkaSource is sometimes emitting tuples multiple times
> -----------------------------------------------------------------
>
>                 Key: FLINK-2008
>                 URL: https://issues.apache.org/jira/browse/FLINK-2008
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> The PersistentKafkaSource is expected to emit records exactly once.
> Two test cases of the KafkaITCase are sporadically failing because records 
> are emitted multiple times.
> Affected tests:
> {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
> changed manually in ZK:
> {code}
> java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
> array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
> 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
> 2]
> {code}
> {{brokerFailureTest()}} also fails:
> {code}
> 05/13/2015 08:13:16   Custom source -> Stream Sink(1/1) switched to FAILED 
> java.lang.AssertionError: Received tuple with value 21 twice
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.assertTrue(Assert.java:41)
>       at org.junit.Assert.assertFalse(Assert.java:64)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
>       at 
> org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
>       at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
>       at 
> org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to