Re: Trapping Streaming Errors

2017-02-15 Thread Fabian Hueske
Hi Joe, you can also insert a MapFunction between the Kafka source and the keyBy to validate the IDs. The mapper will be chained and should not add only minimal overhead. If you want to keep the events which were incorrectly deserialized, you can use split() to move them somewhere. Validation in

Trapping Streaming Errors

2017-02-15 Thread Joe Olson
If I am processing a stream in the following manner: val stream = env.addSource(consumer).name("KafkaStream") .keyBy(x => (x.obj.ID1(),x.obj.ID2(),x.obj.ID3()) .flatMap(new FlatMapProcessor) and the IDs bomb out because of deserialization issues, my job crashes with a 'Could not extract key'