Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4871#discussion_r146302059
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -49,61 +71,205 @@ public void 
testCreateWithNonSerializableDeserializerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided serialization schema is 
not serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testCreateWithSerializableDeserializer() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testConfigureWithNonSerializableCustomPartitionerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided custom partitioner is not 
serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
NonSerializableCustomPartitioner());
        }
     
        @Test
        public void testConfigureWithSerializableCustomPartitioner() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
SerializableCustomPartitioner());
        }
     
        @Test
        public void testConsumerIsSerializable() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
    +           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
                assertTrue(InstantiationUtil.isSerializable(consumer));
        }
     
        // 
----------------------------------------------------------------------
    +   // Tests to verify at-least-once guarantee
    +   // 
----------------------------------------------------------------------
    +
    +   /**
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
exception").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
exception").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * <p>Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownAfterFlush() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           // only let the first record succeed for now
    +           UserRecordResult result = mock(UserRecordResult.class);
    +           when(result.isSuccessful()).thenReturn(true);
    +           producer.getPendingRecordFutures().get(0).set(result);
    +
    +           CheckedThread snapshotThread = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           // this should block at first, since there are 
still two pending records that needs to be flushed
    +                           testHarness.snapshot(123L, 123L);
    +                   }
    +           };
    +           snapshotThread.start();
    +
    +           // let the 2nd message fail with an async exception
    +           producer.getPendingRecordFutures().get(1).setException(new 
Exception("artificial async failure for 2nd message"));
    +           
producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
    +
    +           try {
    +                   snapshotThread.sync();
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   e.printStackTrace();
    --- End diff --
    
    Leftover printing.


---

Reply via email to