Tzu-Li (Gordon) Tai wrote
> It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or
> is there any specific reason why you want to emit elements to Kafka in a
> map function?
> 
> The correct way to use it is to add it as a sink function to your
> pipeline, i.e.
> 
> DataStream
> <String>
>  someStream = …
> someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
> // or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream,
> “topic”, schema, props);

The reason we want to use processElement() & a map function, instead of
someStream.addSink() is that our output logic has conditional depending on
the type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream<byte []>
  parsed, producing a List of successful events and/or error events:
DataStream<List&lt;Events>>
  outputted conditionally, going to Kafka or Cassandra depending on which
type of event it is.


This is our code for output logic (although modified types to not use our
IP):

void output(DataStream<List&lt;SuperclassEvent>> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just
don't know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to
iterate through our List<SuperclassElement> and conditionally send each
individual record to its appropriate sink depending on its type. 

I know Flink offers SplitStreams for a similar purpose, but this doesn't
seem very ideal to us because it requires a new operator to first split the
stream, and only after the whole List is processed can the records then be
sent to their respective sinks. Whereas the code above sends the records to
their sinks immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on
individual records instead of the whole DataStream? Or are we misusing
Flink? How do you recommend doing this the best way possible?


-- 

Also, if processElement() and invoke() aren't meant to be used, should they
be made package private? Happy to make a pull request if so, although fear
that might break a few things.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to