Our parser.parse() function has a one-to-one mapping between an input byte[] to a List<SuperclassEvent> Ideally, this should be handled within the KeyedDeserializationSchema passed to your Kafka consumer. That would then avoid the need of an extra “parser map function” after the source.
Were you suggesting a flatMap instead of map at this stage of calling our parser, or did you mean to use a flatMap() after the parser and before the split()? I meant a flatMap after the parser (whether it’s done as a map function or within the Kafka source) and before the split. The flatMap function iterates through your per-record lists and collects as it iterates through them. - Gordon On 18 July 2017 at 3:02:45 AM, earellano (eric.arell...@ge.com) wrote: Tzu-Li (Gordon) Tai wrote > Basically, when two operators are chained together, the output of the > first operator is immediately chained to the processElement of the next > operator; it’s therefore just a consecutive invocation of processElements > on the chained operators. There will be no thread-to-thread handover or > buffering. Okay great, chaining tasks does sound like what we want then. Tzu-Li (Gordon) Tai wrote > In that case, I would suggest using flatMap here, followed by chained > splits and then sinks. We changed our code to roughly follow this suggestion, but I'm not sure we're doing this correctly? Is there a better way you recommend chaining the tasks? As written below, are individual Events within the List being sent to their respective sinks right away, or does the whole list have to split first? <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14312/split-stream.png> We also had issues getting flatMap to work, and map seemed more appropriate. Our parser.parse() function has a one-to-one mapping between an input byte[] to a List<SuperclassEvent>, and that never changes, so a map seems to make sense to us. Were you suggesting a flatMap instead of map at this stage of calling our parser, or did you mean to use a flatMap() after the parser and before the split()? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.