Hi Martjin,

Thanks a lot for that example. This looks a lot like my suggested workaround. 
Good to know that my idea was not too naive.

All the best,
Peter

> On 3. Aug 2022, at 16:16, Martijn Visser <martijnvis...@apache.org> wrote:
> 
> Hi Peter,
> 
> You could consider the pattern that was used to create a Kafka Dead Letter 
> Queue. There's a recipe including source code available for that at 
> https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/
>  
> <https://docs.immerok.cloud/docs/cookbook/creating-dead-letter-queues-from-and-to-apache-kafka-with-apache-flink/>
> 
> Best regards,
> 
> Martijn
> 
> 
> Op wo 3 aug. 2022 om 15:56 schreef Peter Schrott <pe...@bluerootlabs.io 
> <mailto:pe...@bluerootlabs.io>>:
> Hi Flink Ppl!
> 
> Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.
> 
> I have the requirement to drop certain events before they enter the jobs 
> pipeline. There are mainly 2 reasons:
> 1) Problems when it comes to deserializing the incoming bytes
> 2) Problems with the event itself, e.g. missing timestamp for event time 
> based processing (actually also a problem of deserialization, just a matter 
> of strictness) 
> 
> The FlinkKinesisConsumer is set up with an 
> BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time 
> processing.
>  
> Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 
> <https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment 
> [1]: I hoped to find a solution in returning null in the implementation of 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.)
>  
> 
> Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer 
> (but in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does 
> not handle the returned null and fails with an NPE when trying to extract the 
> timestamp [2].
> 
> I found out that the RecordEmitter later on actually filters for null events 
> and drops them [3].
> 
> Is this a gap in the implementation or works as designed?
> 
> Does someone have an idea how to drop events before entering the pipeline?
> 
> I do have one workaround in mind: Do not add the timestamps/watermarks at 
> source but at a later step in the stream, i.e. after cleaning out bad events. 
> But this still does not overcome the problem with exceptions on event 
> deserialization. This whereas could be solved by not deserializing events in 
> the consumer, i.e. consuming bytes and add the deserialization as a flat map 
> function as a first step in the pipeline. But all this does not sound good to 
> me. 
> 
> Thanks & Best
> Peter
> 
> [1] 
> https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204
>  
> <https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
> [2] 
> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999
>  
> <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999>
> [3] 
> https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027
>  
> <https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027>
> [4] 
> 

Reply via email to