Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4871#discussion_r146302059 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java --- @@ -49,61 +71,205 @@ public void testCreateWithNonSerializableDeserializerFails() { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided serialization schema is not serializable"); - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig); + new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties()); } @Test public void testCreateWithSerializableDeserializer() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig); + new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties()); } @Test public void testConfigureWithNonSerializableCustomPartitionerFails() { exception.expect(IllegalArgumentException.class); exception.expectMessage("The provided custom partitioner is not serializable"); - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) .setCustomPartitioner(new NonSerializableCustomPartitioner()); } @Test public void testConfigureWithSerializableCustomPartitioner() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig) + new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()) .setCustomPartitioner(new SerializableCustomPartitioner()); } @Test public void testConsumerIsSerializable() { - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig); + FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties()); assertTrue(InstantiationUtil.isSerializable(consumer)); } // ---------------------------------------------------------------------- + // Tests to verify at-least-once guarantee + // ---------------------------------------------------------------------- + + /** + * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownOnInvoke() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + + producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception")); + + try { + testHarness.processElement(new StreamRecord<>("msg-2")); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent()); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + + producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async exception")); + + try { + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent()); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint, + * it should be rethrown; we set a timeout because the test will not finish if the logic is broken. + * + * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds. + * The test for that is covered in testAtLeastOnceProducer. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void testAsyncErrorRethrownAfterFlush() throws Throwable { + final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + + // only let the first record succeed for now + UserRecordResult result = mock(UserRecordResult.class); + when(result.isSuccessful()).thenReturn(true); + producer.getPendingRecordFutures().get(0).set(result); + + CheckedThread snapshotThread = new CheckedThread() { + @Override + public void go() throws Exception { + // this should block at first, since there are still two pending records that needs to be flushed + testHarness.snapshot(123L, 123L); + } + }; + snapshotThread.start(); + + // let the 2nd message fail with an async exception + producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message")); + producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class)); + + try { + snapshotThread.sync(); + } catch (Exception e) { + // the next invoke should rethrow the async exception + e.printStackTrace(); --- End diff -- Leftover printing.
---