Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r69136271 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java --- @@ -35,69 +35,77 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.Assert; import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; /** * Test ensuring that the producer is not dropping buffered records */ @SuppressWarnings("unchecked") public class AtLeastOnceProducerTest { - @Test + // we set a timeout because the test will not finish if the logic is broken + @Test(timeout=5000) public void testAtLeastOnceProducer() throws Exception { runTest(true); } // This test ensures that the actual test fails if the flushing is disabled - @Test(expected = AssertionError.class) + @Test(expected = AssertionError.class, timeout=5000) public void ensureTestFails() throws Exception { runTest(false); } private void runTest(boolean flushOnCheckpoint) throws Exception { Properties props = new Properties(); - final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props); + final OneShotLatch snapshottingFinished = new OneShotLatch(); + final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, + snapshottingFinished); producer.setFlushOnCheckpoint(flushOnCheckpoint); producer.setRuntimeContext(new MockRuntimeContext(0, 1)); producer.open(new Configuration()); - for(int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { producer.invoke("msg-" + i); } // start a thread confirming all pending records final Tuple1<Throwable> runnableError = new Tuple1<>(null); - final AtomicBoolean markOne = new AtomicBoolean(false); + final Thread threadA = Thread.currentThread(); + Runnable confirmer = new Runnable() { @Override public void run() { try { MockProducer mp = producer.getProducerInstance(); List<Callback> pending = mp.getPending(); - // we ensure thread A is locked and didn't reach markOne - // give thread A some time to really reach the snapshot state - Thread.sleep(500); - if(markOne.get()) { - Assert.fail("Snapshot was confirmed even though messages " + - "were still in the buffer"); + // we need to find out if the snapshot() method blocks forever + // this is not possible. If snapshot() is running, it will + // start removing elements from the pending list. + synchronized (threadA) { + threadA.wait(500L); } + // we now check that no records have been confirmed yet Assert.assertEquals(100, pending.size()); + Assert.assertFalse("Snapshot method returned before all records were confirmed", + snapshottingFinished.hasTriggered()); // now confirm all checkpoints - for(Callback c: pending) { + for (Callback c: pending) { c.onCompletion(null, null); } pending.clear(); - // wait for the snapshotState() method to return - Thread.sleep(100); - Assert.assertTrue("Snapshot state didn't return", markOne.get()); + // wait for the snapshotState() method to return. The will + // fail if snapshotState never returns. + snapshottingFinished.await(); --- End diff -- I think you don't need this condition here. There are two possibilities: 1. ThreadA leaves `snapshotState` successfully and waits for `threadB`. Since it completed `snapshotState`, the `snapshottingFinished` will be triggered. Thus, there is no waiting. 2. ThreadA blocks in `snapshotState`. Then `threadB` does not have to block to trigger the timeout, because `threadA` is already blocked. Consequently, I think you could replace the `OneShotLatch` with a volatile boolean.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---