Hi Martjin, Thanks a lot for that example. This looks a lot like my suggested workaround. Good to know that my idea was not too naive.
All the best, Peter > On 3. Aug 2022, at 16:16, Martijn Visser <martijnvis...@apache.org> wrote: > > Hi Peter, > > You could consider the pattern that was used to create a Kafka Dead Letter > Queue. There's a recipe including source code available for that at > https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/ > > <https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/> > > Best regards, > > Martijn > > > Op wo 3 aug. 2022 om 15:56 schreef Peter Schrott <pe...@bluerootlabs.io > <mailto:pe...@bluerootlabs.io>>: > Hi Flink Ppl! > > Working with Apache Flink v 1.13.2 on AWS with Kinesis as source. > > I have the requirement to drop certain events before they enter the jobs > pipeline. There are mainly 2 reasons: > 1) Problems when it comes to deserializing the incoming bytes > 2) Problems with the event itself, e.g. missing timestamp for event time > based processing (actually also a problem of deserialization, just a matter > of strictness) > > The FlinkKinesisConsumer is set up with an > BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time > processing. > > Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 > <https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment > [1]: I hoped to find a solution in returning null in the implementation of > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.) > > > Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer > (but in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does > not handle the returned null and fails with an NPE when trying to extract the > timestamp [2]. > > I found out that the RecordEmitter later on actually filters for null events > and drops them [3]. > > Is this a gap in the implementation or works as designed? > > Does someone have an idea how to drop events before entering the pipeline? > > I do have one workaround in mind: Do not add the timestamps/watermarks at > source but at a later step in the stream, i.e. after cleaning out bad events. > But this still does not overcome the problem with exceptions on event > deserialization. This whereas could be solved by not deserializing events in > the consumer, i.e. consuming bytes and add the deserialization as a flat map > function as a first step in the pipeline. But all this does not sound good to > me. > > Thanks & Best > Peter > > [1] > https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204 > > <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204> > [2] > https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999 > > <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999> > [3] > https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027 > > <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027> > [4] >