[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14545207#comment-14545207 ]
Robert Metzger commented on FLINK-2008: --------------------------------------- Another kafka test failure: https://travis-ci.org/apache/flink/jobs/62666187 > PersistentKafkaSource is sometimes emitting tuples multiple times > ----------------------------------------------------------------- > > Key: FLINK-2008 > URL: https://issues.apache.org/jira/browse/FLINK-2008 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming > Affects Versions: 0.9 > Reporter: Robert Metzger > Assignee: Robert Metzger > > The PersistentKafkaSource is expected to emit records exactly once. > Two test cases of the KafkaITCase are sporadically failing because records > are emitted multiple times. > Affected tests: > {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been > changed manually in ZK: > {code} > java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 > array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, > 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, > 2] > {code} > {{brokerFailureTest()}} also fails: > {code} > 05/13/2015 08:13:16 Custom source -> Stream Sink(1/1) switched to FAILED > java.lang.AssertionError: Received tuple with value 21 twice > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) > at > org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) > at > org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) > at > org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) > at > org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)