Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2108#discussion_r69136271
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
    @@ -35,69 +35,77 @@
     import org.apache.kafka.common.serialization.ByteArraySerializer;
     import org.junit.Assert;
     import org.junit.Test;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.Serializable;
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Map;
     import java.util.Properties;
     import java.util.concurrent.Future;
    -import java.util.concurrent.atomic.AtomicBoolean;
     
     /**
      * Test ensuring that the producer is not dropping buffered records
      */
     @SuppressWarnings("unchecked")
     public class AtLeastOnceProducerTest {
     
    -   @Test
    +   // we set a timeout because the test will not finish if the logic is 
broken
    +   @Test(timeout=5000)
        public void testAtLeastOnceProducer() throws Exception {
                runTest(true);
        }
     
        // This test ensures that the actual test fails if the flushing is 
disabled
    -   @Test(expected = AssertionError.class)
    +   @Test(expected = AssertionError.class, timeout=5000)
        public void ensureTestFails() throws Exception {
                runTest(false);
        }
     
        private void runTest(boolean flushOnCheckpoint) throws Exception {
                Properties props = new Properties();
    -           final TestingKafkaProducer<String> producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props);
    +           final OneShotLatch snapshottingFinished = new OneShotLatch();
    +           final TestingKafkaProducer<String> producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
    +                           snapshottingFinished);
                producer.setFlushOnCheckpoint(flushOnCheckpoint);
                producer.setRuntimeContext(new MockRuntimeContext(0, 1));
     
                producer.open(new Configuration());
     
    -           for(int i = 0; i < 100; i++) {
    +           for (int i = 0; i < 100; i++) {
                        producer.invoke("msg-" + i);
                }
                // start a thread confirming all pending records
                final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -           final AtomicBoolean markOne = new AtomicBoolean(false);
    +           final Thread threadA = Thread.currentThread();
    +
                Runnable confirmer = new Runnable() {
                        @Override
                        public void run() {
                                try {
                                        MockProducer mp = 
producer.getProducerInstance();
                                        List<Callback> pending = 
mp.getPending();
     
    -                                   // we ensure thread A is locked and 
didn't reach markOne
    -                                   // give thread A some time to really 
reach the snapshot state
    -                                   Thread.sleep(500);
    -                                   if(markOne.get()) {
    -                                           Assert.fail("Snapshot was 
confirmed even though messages " +
    -                                                           "were still in 
the buffer");
    +                                   // we need to find out if the 
snapshot() method blocks forever
    +                                   // this is not possible. If snapshot() 
is running, it will
    +                                   // start removing elements from the 
pending list.
    +                                   synchronized (threadA) {
    +                                           threadA.wait(500L);
                                        }
    +                                   // we now check that no records have 
been confirmed yet
                                        Assert.assertEquals(100, 
pending.size());
    +                                   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
    +                                                   
snapshottingFinished.hasTriggered());
     
                                        // now confirm all checkpoints
    -                                   for(Callback c: pending) {
    +                                   for (Callback c: pending) {
                                                c.onCompletion(null, null);
                                        }
                                        pending.clear();
    -                                   // wait for the snapshotState() method 
to return
    -                                   Thread.sleep(100);
    -                                   Assert.assertTrue("Snapshot state 
didn't return", markOne.get());
    +                                   // wait for the snapshotState() method 
to return. The will
    +                                   // fail if snapshotState never returns.
    +                                   snapshottingFinished.await();
    --- End diff --
    
    I think you don't need this condition here. There are two possibilities: 
    
    1. ThreadA leaves `snapshotState` successfully and waits for `threadB`. 
Since it completed `snapshotState`, the `snapshottingFinished` will be 
triggered. Thus, there is no waiting.
    
    2. ThreadA blocks in `snapshotState`. Then `threadB` does not have to block 
to trigger the timeout, because `threadA` is already blocked.
    
    Consequently, I think you could replace the `OneShotLatch` with a volatile 
boolean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to