Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4871#discussion_r146301761
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -49,61 +71,205 @@ public void 
testCreateWithNonSerializableDeserializerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided serialization schema is 
not serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
NonSerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testCreateWithSerializableDeserializer() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), testConfig);
    +           new FlinkKinesisProducer<>(new 
SerializableSerializationSchema(), getStandardProperties());
        }
     
        @Test
        public void testConfigureWithNonSerializableCustomPartitionerFails() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("The provided custom partitioner is not 
serializable");
     
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
NonSerializableCustomPartitioner());
        }
     
        @Test
        public void testConfigureWithSerializableCustomPartitioner() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +           new FlinkKinesisProducer<>(new SimpleStringSchema(), 
getStandardProperties())
                        .setCustomPartitioner(new 
SerializableCustomPartitioner());
        }
     
        @Test
        public void testConsumerIsSerializable() {
    -           Properties testConfig = new Properties();
    -           testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    -           
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig);
    +           FlinkKinesisProducer<String> consumer = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties());
                assertTrue(InstantiationUtil.isSerializable(consumer));
        }
     
        // 
----------------------------------------------------------------------
    +   // Tests to verify at-least-once guarantee
    +   // 
----------------------------------------------------------------------
    +
    +   /**
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async 
exception").isPresent());
    +
    +                   // test succeeded
    +                   return;
    +           }
    +
    +           Assert.fail();
    +   }
    +
    +   /**
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown.
    +    */
    +   @SuppressWarnings("ResultOfMethodCallIgnored")
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           producer.getPendingRecordFutures().get(0).setException(new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    --- End diff --
    
    nit: the comment refers to `invoke`, which is probably copy-pasted form 
above


---

Reply via email to