Hi Flink Ppl! Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.
I have the requirement to drop certain events before they enter the jobs pipeline. There are mainly 2 reasons: 1) Problems when it comes to deserializing the incoming bytes 2) Problems with the event itself, e.g. missing timestamp for event time based processing (actually also a problem of deserialization, just a matter of strictness) The FlinkKinesisConsumer is set up with an BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time processing. Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 <https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment [1]: I hoped to find a solution in returning null in the implementation of org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.) Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer (but in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does not handle the returned null and fails with an NPE when trying to extract the timestamp [2]. I found out that the RecordEmitter later on actually filters for null events and drops them [3]. Is this a gap in the implementation or works as designed? Does someone have an idea how to drop events before entering the pipeline? I do have one workaround in mind: Do not add the timestamps/watermarks at source but at a later step in the stream, i.e. after cleaning out bad events. But this still does not overcome the problem with exceptions on event deserialization. This whereas could be solved by not deserializing events in the consumer, i.e. consuming bytes and add the deserialization as a flat map function as a first step in the pipeline. But all this does not sound good to me. Thanks & Best Peter [1] https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204 <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204> [2] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999> [3] https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027 <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027> [4]