[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357110#comment-15357110 ]
ASF GitHub Bot commented on FLINK-4027: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r69136271 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java --- @@ -35,69 +35,77 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.Assert; import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; /** * Test ensuring that the producer is not dropping buffered records */ @SuppressWarnings("unchecked") public class AtLeastOnceProducerTest { - @Test + // we set a timeout because the test will not finish if the logic is broken + @Test(timeout=5000) public void testAtLeastOnceProducer() throws Exception { runTest(true); } // This test ensures that the actual test fails if the flushing is disabled - @Test(expected = AssertionError.class) + @Test(expected = AssertionError.class, timeout=5000) public void ensureTestFails() throws Exception { runTest(false); } private void runTest(boolean flushOnCheckpoint) throws Exception { Properties props = new Properties(); - final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props); + final OneShotLatch snapshottingFinished = new OneShotLatch(); + final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, + snapshottingFinished); producer.setFlushOnCheckpoint(flushOnCheckpoint); producer.setRuntimeContext(new MockRuntimeContext(0, 1)); producer.open(new Configuration()); - for(int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { producer.invoke("msg-" + i); } // start a thread confirming all pending records final Tuple1<Throwable> runnableError = new Tuple1<>(null); - final AtomicBoolean markOne = new AtomicBoolean(false); + final Thread threadA = Thread.currentThread(); + Runnable confirmer = new Runnable() { @Override public void run() { try { MockProducer mp = producer.getProducerInstance(); List<Callback> pending = mp.getPending(); - // we ensure thread A is locked and didn't reach markOne - // give thread A some time to really reach the snapshot state - Thread.sleep(500); - if(markOne.get()) { - Assert.fail("Snapshot was confirmed even though messages " + - "were still in the buffer"); + // we need to find out if the snapshot() method blocks forever + // this is not possible. If snapshot() is running, it will + // start removing elements from the pending list. + synchronized (threadA) { + threadA.wait(500L); } + // we now check that no records have been confirmed yet Assert.assertEquals(100, pending.size()); + Assert.assertFalse("Snapshot method returned before all records were confirmed", + snapshottingFinished.hasTriggered()); // now confirm all checkpoints - for(Callback c: pending) { + for (Callback c: pending) { c.onCompletion(null, null); } pending.clear(); - // wait for the snapshotState() method to return - Thread.sleep(100); - Assert.assertTrue("Snapshot state didn't return", markOne.get()); + // wait for the snapshotState() method to return. The will + // fail if snapshotState never returns. + snapshottingFinished.await(); --- End diff -- I think you don't need this condition here. There are two possibilities: 1. ThreadA leaves `snapshotState` successfully and waits for `threadB`. Since it completed `snapshotState`, the `snapshottingFinished` will be triggered. Thus, there is no waiting. 2. ThreadA blocks in `snapshotState`. Then `threadB` does not have to block to trigger the timeout, because `threadA` is already blocked. Consequently, I think you could replace the `OneShotLatch` with a volatile boolean. > FlinkKafkaProducer09 sink can lose messages > ------------------------------------------- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.3 > Reporter: Elias Levy > Assignee: Robert Metzger > Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)