Hi, A while ago we had found that if you construct a Kafka Producer that it always uses the FlinkFixedPartitioner to spread the data across the Kafka partitions. Except when you give it a custom partitioner.
Because we want all our elements to be partitioned by the key of the records we created this issue and put up a pull request with a simple FlinkKeyHashPartitioner. https://issues.apache.org/jira/browse/FLINK-9610 A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does this by default already, why this change?" So I did a lot deeper digging to understand how the partitioning decisions and data flows from the Flink API down into the Kafka producer client code. My conclusions: 1) The Kafka producer code uses the provided partitioner, if it doesn't have that it uses the hash of the key, if it doesn't have a key then it does a round robin distribution. 2) The Flink Kafka producer constructors are available in the variants with and without a partitioner. Even if you provide a valid key for each record it will still use the FlinkFixedPartitioner if no explicit partitioner has been specified. Looking at the code (I haven't tried it) you can actually get the desired behavior without any code changes by using the constructor that requires a partitioner and there give it a null value. Yuck! In my opinion providing a KeyedSerializationSchema is an implicit way of specifying that you want to use that key to partition the data by. So to make this a workable situation I see three ways to handle this: 1) We merge something like the partitioner I proposed. 2) We change the constructors that get a KeyedSerializationSchema to use that key for partitioning. 3) We remove all constructors that have a KeyedSerializationSchema because the key is never used anyway. I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward compatibility is the best solution. So to clarify the change I propose here: We change the behavior of the all flink producer constructors that have a KeyedSerializationSchema parameter and NO partitioner. The proposed change is that because we HAVE a key and we do NOT have a partitioner the partitioning is done by the partitioning code that already exists in the underlying Kafka. So for the rest of the constructors the behavior remains unchanged: - With a NON-Keyed SerializationSchema - With a provided partitioner What do you guys think? -- Best regards / Met vriendelijke groeten, Niels Basjes