Hi!

You can exploit that, yes. If you read data from Kafka in Flink, a Kafka
partition is "sticky" to a Flink source subtask. If you have (kafka-source
=> mapFunction) for example, you can be sure that all values with the same
key go through the same parallel mapFunction subtask. If you maintain a
HashMap in there, you basically have state by key based on the Kafka
partitions.

If you want to use Flink's internal key/value state, however, you need to
let Flink re-partition the data by using "keyBy()". That is because Flink's
internal sharding of state (including the re-sharding to adjust parallelism
we are currently working on) follows a dedicated hashing scheme which is
with all likelihood different from the partition function that writes the
key/value pairs to the Kafka Topics.

Hope that helps...

Greetings,
Stephan





On Wed, Apr 13, 2016 at 9:20 AM, Andrew Coates <big.andy.coa...@gmail.com>
wrote:

> Hi Stephan,
>
> If we were to do that, would flink leverage the fact that Kafka has
> already partitioned the data by the key, or would flink attempt to shuffle
> the data again into its own partitions, potentially shuffling data between
> machines for no gain?
>
> Thanks,
>
> Andy
>
> On Sun, 10 Apr 2016, 13:22 Stephan Ewen, <se...@apache.org> wrote:
>
>> Hi!
>>
>> You are right with your observations. Right now, you would have to create
>> a "Tuple2<Key, Value>" in the KeyedDeserializationSchema. That is what also
>> a KeyedStream holds internally.
>>
>> A KeyedStream in Flink is more than just a stream that has a Key and a
>> Value - it is also partitioned by the key, and Flink maintains track of
>> keyed state in those streams. That's why it has to be explicitly created.
>>
>> For convenience, one could make an addition that FlinkKafkaConsumer can
>> accept two DeserializationSchema (one for key, one for value) and return a
>> Tuple2<Key, Value> automatically.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>>
>>> I am wondering if the Kafka connectors leverage Kafka message keys at
>>> all?
>>>
>>> Looking through the docs my impression is that it does not.  E.g. if I
>>> use the connector to consume from a partitioned Kafka topic, what I will
>>> get back is a DataStream, rather than a KeyedStream.  And if I want access
>>> to a message's key the key must be within the message to extract it or I
>>> have to make use of a KeyedDeserializationSchema with the connector to
>>> access the Kafka message key and insert it into the type returned by the
>>> connector.
>>>
>>> Similar, it would seem that you have give the Kafka product sink a
>>> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message
>>> from the events from a DataStream, but you can product from a KeyedStream
>>> where the key if obtained from the stream itself.
>>>
>>> Is this correct?
>>>
>>>
>>

Reply via email to