Hi! You can exploit that, yes. If you read data from Kafka in Flink, a Kafka partition is "sticky" to a Flink source subtask. If you have (kafka-source => mapFunction) for example, you can be sure that all values with the same key go through the same parallel mapFunction subtask. If you maintain a HashMap in there, you basically have state by key based on the Kafka partitions.
If you want to use Flink's internal key/value state, however, you need to let Flink re-partition the data by using "keyBy()". That is because Flink's internal sharding of state (including the re-sharding to adjust parallelism we are currently working on) follows a dedicated hashing scheme which is with all likelihood different from the partition function that writes the key/value pairs to the Kafka Topics. Hope that helps... Greetings, Stephan On Wed, Apr 13, 2016 at 9:20 AM, Andrew Coates <big.andy.coa...@gmail.com> wrote: > Hi Stephan, > > If we were to do that, would flink leverage the fact that Kafka has > already partitioned the data by the key, or would flink attempt to shuffle > the data again into its own partitions, potentially shuffling data between > machines for no gain? > > Thanks, > > Andy > > On Sun, 10 Apr 2016, 13:22 Stephan Ewen, <se...@apache.org> wrote: > >> Hi! >> >> You are right with your observations. Right now, you would have to create >> a "Tuple2<Key, Value>" in the KeyedDeserializationSchema. That is what also >> a KeyedStream holds internally. >> >> A KeyedStream in Flink is more than just a stream that has a Key and a >> Value - it is also partitioned by the key, and Flink maintains track of >> keyed state in those streams. That's why it has to be explicitly created. >> >> For convenience, one could make an addition that FlinkKafkaConsumer can >> accept two DeserializationSchema (one for key, one for value) and return a >> Tuple2<Key, Value> automatically. >> >> Greetings, >> Stephan >> >> >> On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy <fearsome.lucid...@gmail.com> >> wrote: >> >>> I am wondering if the Kafka connectors leverage Kafka message keys at >>> all? >>> >>> Looking through the docs my impression is that it does not. E.g. if I >>> use the connector to consume from a partitioned Kafka topic, what I will >>> get back is a DataStream, rather than a KeyedStream. And if I want access >>> to a message's key the key must be within the message to extract it or I >>> have to make use of a KeyedDeserializationSchema with the connector to >>> access the Kafka message key and insert it into the type returned by the >>> connector. >>> >>> Similar, it would seem that you have give the Kafka product sink a >>> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message >>> from the events from a DataStream, but you can product from a KeyedStream >>> where the key if obtained from the stream itself. >>> >>> Is this correct? >>> >>> >>