Hi Flink Ppl!

Working with Apache Flink v 1.13.2 on AWS with Kinesis as source.

I have the requirement to drop certain events before they enter the jobs 
pipeline. There are mainly 2 reasons:
1) Problems when it comes to deserializing the incoming bytes
2) Problems with the event itself, e.g. missing timestamp for event time based 
processing (actually also a problem of deserialization, just a matter of 
strictness) 

The FlinkKinesisConsumer is set up with an 
BoundedOutOfOrdernessTimestampExtractor to enable watermarking and event time 
processing.
 
Now I found that ticket: https://issues.apache.org/jira/browse/FLINK-3679 
<https://issues.apache.org/jira/browse/FLINK-3679> and the promising comment 
[1]: I hoped to find a solution in returning null in the implementation of 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(.) 

Unfortunately, this is not (yet?) implemented for the FlinkKinesisConsumer (but 
in FlinkKafkaConsumer) respectively the ShardConsumer. The consumer does not 
handle the returned null and fails with an NPE when trying to extract the 
timestamp [2].

I found out that the RecordEmitter later on actually filters for null events 
and drops them [3].

Is this a gap in the implementation or works as designed?

Does someone have an idea how to drop events before entering the pipeline?

I do have one workaround in mind: Do not add the timestamps/watermarks at 
source but at a later step in the stream, i.e. after cleaning out bad events. 
But this still does not overcome the problem with exceptions on event 
deserialization. This whereas could be solved by not deserializing events in 
the consumer, i.e. consuming bytes and add the deserialization as a flat map 
function as a first step in the pipeline. But all this does not sound good to 
me. 

Thanks & Best
Peter

[1] 
https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15456204
 
<https://issues.apache.org/jira/browse/FLINK-3679?focusedCommentId=15456204&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15456204>
[2] 
https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999
 
<https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L999>
[3] 
https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027
 
<https://github.com/apache/flink/blob/9efd97e05717181ee9b5489cddc27b6351127e38/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L1027>
[4] 

Reply via email to