GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5269
[FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip non-deserializable records ## What is the purpose of the change This PR is based on #5268, which includes fixes to harden Kinesis unit tests. Only the last commit is relevant. In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka records which cannot be deserialized. In reality pipelines, it is entirely normal that this could happen. This PR adds this functionality to the Flink Kinesis Consumer also. ## Brief change log - Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be returned if a message cannot be deserialized. - If `record` is `null` in `KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output for the record. - Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature. ## Verifying this change Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test verifies this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5269 ---- commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3 Author: Tzu-Li (Gordon) Tai <tzulitai@...> Date: 2018-01-10T06:04:10Z [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records This commit acknowledges that null can be returned from the deserialization schema, if the message cannot be deserialized. If null is returned for a Kinesis record, no output is produced for that record, while the sequence number in the shard state is still advanced so that the record is effectively accounted as processed. ---- ---