Our parser.parse() function has a one-to-one mapping between an input byte[] 
to a List<SuperclassEvent>
Ideally, this should be handled within the KeyedDeserializationSchema passed to 
your Kafka consumer. That would then avoid the need of an extra “parser map 
function” after the source.

Were you suggesting a flatMap instead of map at this stage of 
calling our parser, or did you mean to use a flatMap() after the parser and 
before the split()? 
I meant a flatMap after the parser (whether it’s done as a map function or 
within the Kafka source) and before the split. The flatMap function iterates 
through your per-record lists and collects as it iterates through them.

- Gordon




On 18 July 2017 at 3:02:45 AM, earellano (eric.arell...@ge.com) wrote:

Tzu-Li (Gordon) Tai wrote  
> Basically, when two operators are chained together, the output of the  
> first operator is immediately chained to the processElement of the next  
> operator; it’s therefore just a consecutive invocation of processElements  
> on the chained operators. There will be no thread-to-thread handover or  
> buffering.  

Okay great, chaining tasks does sound like what we want then.  



Tzu-Li (Gordon) Tai wrote  
> In that case, I would suggest using flatMap here, followed by chained  
> splits and then sinks.  

We changed our code to roughly follow this suggestion, but I'm not sure  
we're doing this correctly? Is there a better way you recommend chaining the  
tasks? As written below, are individual Events within the List being sent to  
their respective sinks right away, or does the whole list have to split  
first?  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14312/split-stream.png>
  

We also had issues getting flatMap to work, and map seemed more appropriate.  
Our parser.parse() function has a one-to-one mapping between an input byte[]  
to a List<SuperclassEvent>, and that never changes, so a map seems to make  
sense to us. Were you suggesting a flatMap instead of map at this stage of  
calling our parser, or did you mean to use a flatMap() after the parser and  
before the split()?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to