With task chaining as you're saying, could you help clarify how it works 
please?
Operator can be chained to be executed by a single task thread. See [1] for 
more details on that.

Basically, when two operators are chained together, the output of the first 
operator is immediately chained to the processElement of the next operator; 
it’s therefore just a consecutive invocation of processElements on the chained 
operators. There will be no thread-to-thread handover or buffering.

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.
In that case, I would suggest using flatMap here, followed by chained splits 
and then sinks.

Using flatMap, you can collect elements as you iterate through the list element 
(i.e. `collector.collect(...)`). If the sinks are properly chained (which 
should be the case if there is no keyBy before the sink and you haven’t 
explicitly configured otherwise [2]), then for each .collect(...) the sink 
write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element 
as you iterate through that list (happening in the same thread since everything 
is chained), and matches what you have in mind.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

On 17 July 2017 at 2:06:52 PM, earellano (eric.arell...@ge.com) wrote:

Hi,  

Tzu-Li (Gordon) Tai wrote  
> These seems odd. Are your events intended to be a list? If not, this  
> should be a `DataStream  
> <SuperclassEvent>  
> `.  
>  
> From the code snippet you’ve attached in the first post, it seems like  
> you’ve initialized your source incorrectly.  
>  
> `env.fromElements(List<...>)` will take the whole list as a single event,  
> thus your source is only emitting a single list as a record.  

Ah sorry for the confusion. So the original code snippet isn't our actual  
code - it's a simplified and generified version so that it would be easy to  
reproduce the Null Pointer Exception without having to show our whole code  
base.  

To clarify, our input actually uses a Kafka Consumer that reads a byte[],  
which is then passed to our external library parser which takes a byte[] and  
converts it into a List<Events>. This is why we have to use  
DataStream<List&lt;Events>>, rather than just DataStream<Event>. It's a  
requirement from the parser we have to use, because each byte[] array record  
can create both a SuccessEvent(s) and/or ErrorEvent(s).  

Our motivation for using the above map & for loop with conditional output  
logic was that we have to work with this whole List<Events> and not just  
individual Events, but don't want to wait for the whole list to be processed  
for the event at the beginning of the list to be outputted. For example, a  
byte[] record can return from our parser a List of 10 SuccessEvents and 1  
ErrorEvent; we want to publish each Event immediately. Low latency is  
extremely important to us.  

--  

With task chaining as you're saying, could you help clarify how it works  
please? With each record of type List<Events> and calling the Split Operator  
followed by the sink operators, does that whole record/list have to be split  
before it can then go on to the sink? Or does task chaining mean it  
immediately gets outputted to the sink?  


Thanks so much for all this help by the way!  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to