Robert Metzger created FLINK-2008: ------------------------------------- Summary: PersistentKafkaSource is sometimes emitting tuples multiple times Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger
The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source -> Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)