A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <paul.joire...@physiq.com>
wrote:

Hi all,
>
>
> Is there a simple way to read the key from a KeyedStream.   Very simply
> I'm trying to read a message from Kafka, separate the incoming messages by
> a field in the message and write the original message back to Kafka using
> that field as a new topic.  I chose to partition the incoming stream by
> creating a KeyedStream and using the field from the message as the key.
>  The only thing left is to write the message to Kafka with a producer but i
> need to know the topic to write to and for that I need to be able to read
> the key.   Is there a way to do this?
>
>
> Is there a better way to do this, rather than using a KeyedStream.
>
>
> Paul
>
​
-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com

Reply via email to