Kezhu Wang created FLINK-21317: ---------------------------------- Summary: Downstream keyed state not work after FlinkKafkaShuffle Key: FLINK-21317 URL: https://issues.apache.org/jira/browse/FLINK-21317 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.13.0 Reporter: Kezhu Wang
{{FlinkKafkaShuffle}} uses {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition records to kafka topic partition. The assignment works as follow: # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int maxParallelism)}} assigns key to key group. # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to operator/subtask index. When kafka topic partitions are consumed, they are redistributed by {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int numParallelSubtasks)}}. I copied code of this redistribution here. {code:java} public class KafkaTopicPartitionAssigner { public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the // start index return (startIndex + partition.getPartition()) % numParallelSubtasks; } } {code} This partition redistribution breaks prerequisites for {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed up. The consequence is unusable keyed state. I list deepest stack trace captured here: {noformat} Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205) at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100) {noformat} cc [~ym] [~sewen] [~AHeise] [~pnowojski] Below is my proposed changes: * Make assignment between partition and subtask customizable. * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 in existing assignment algorithms.) I saw FLINK-8570, above changes could be helpful if we finally decide to deliver FLINK-8570. -- This message was sent by Atlassian Jira (v8.3.4#803005)