With task chaining as you're saying, could you help clarify how it works please? Operator can be chained to be executed by a single task thread. See [1] for more details on that.
Basically, when two operators are chained together, the output of the first operator is immediately chained to the processElement of the next operator; it’s therefore just a consecutive invocation of processElements on the chained operators. There will be no thread-to-thread handover or buffering. For example, a byte[] record can return from our parser a List of 10 SuccessEvents and 1 ErrorEvent; we want to publish each Event immediately. In that case, I would suggest using flatMap here, followed by chained splits and then sinks. Using flatMap, you can collect elements as you iterate through the list element (i.e. `collector.collect(...)`). If the sinks are properly chained (which should be the case if there is no keyBy before the sink and you haven’t explicitly configured otherwise [2]), then for each .collect(...) the sink write will be invoked as part of the chain. Effectively, this would then be writing to Kafka / Cassandra for every element as you iterate through that list (happening in the same thread since everything is chained), and matches what you have in mind. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups On 17 July 2017 at 2:06:52 PM, earellano (eric.arell...@ge.com) wrote: Hi, Tzu-Li (Gordon) Tai wrote > These seems odd. Are your events intended to be a list? If not, this > should be a `DataStream > <SuperclassEvent> > `. > > From the code snippet you’ve attached in the first post, it seems like > you’ve initialized your source incorrectly. > > `env.fromElements(List<...>)` will take the whole list as a single event, > thus your source is only emitting a single list as a record. Ah sorry for the confusion. So the original code snippet isn't our actual code - it's a simplified and generified version so that it would be easy to reproduce the Null Pointer Exception without having to show our whole code base. To clarify, our input actually uses a Kafka Consumer that reads a byte[], which is then passed to our external library parser which takes a byte[] and converts it into a List<Events>. This is why we have to use DataStream<List<Events>>, rather than just DataStream<Event>. It's a requirement from the parser we have to use, because each byte[] array record can create both a SuccessEvent(s) and/or ErrorEvent(s). Our motivation for using the above map & for loop with conditional output logic was that we have to work with this whole List<Events> and not just individual Events, but don't want to wait for the whole list to be processed for the event at the beginning of the list to be outputted. For example, a byte[] record can return from our parser a List of 10 SuccessEvents and 1 ErrorEvent; we want to publish each Event immediately. Low latency is extremely important to us. -- With task chaining as you're saying, could you help clarify how it works please? With each record of type List<Events> and calling the Split Operator followed by the sink operators, does that whole record/list have to be split before it can then go on to the sink? Or does task chaining mean it immediately gets outputted to the sink? Thanks so much for all this help by the way! -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.