Tzu-Li (Gordon) Tai wrote > It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or > is there any specific reason why you want to emit elements to Kafka in a > map function? > > The correct way to use it is to add it as a sink function to your > pipeline, i.e. > > DataStream > <String> > someStream = … > someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props)); > // or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, > “topic”, schema, props);
The reason we want to use processElement() & a map function, instead of someStream.addSink() is that our output logic has conditional depending on the type of record we have. Our overall program follows this path: Serialized JSON consumed from Kafka: DataStream<byte []> parsed, producing a List of successful events and/or error events: DataStream<List<Events>> outputted conditionally, going to Kafka or Cassandra depending on which type of event it is. This is our code for output logic (although modified types to not use our IP): void output(DataStream<List<SuperclassEvent>> inputStream) { inputStream.map( eventList -> for (SuperclassEvent event : eventList) { if (event instanceof SuccessEvent) emitToCassandra(event); else if (event instanceof ErrorEvent) emitToKafka(event); } return true; // we don't actually want to return anything, just don't know how else to use map ); That is, we have sinks for both Kafka and Cassandra, and want to be able to iterate through our List<SuperclassElement> and conditionally send each individual record to its appropriate sink depending on its type. I know Flink offers SplitStreams for a similar purpose, but this doesn't seem very ideal to us because it requires a new operator to first split the stream, and only after the whole List is processed can the records then be sent to their respective sinks. Whereas the code above sends the records to their sinks immediately upon finding its type. -- Is there any way to make processElement() work so that we can work on individual records instead of the whole DataStream? Or are we misusing Flink? How do you recommend doing this the best way possible? -- Also, if processElement() and invoke() aren't meant to be used, should they be made package private? Happy to make a pull request if so, although fear that might break a few things. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.