[ https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Weise updated FLINK-21317: --------------------------------- Fix Version/s: (was: 1.14.3) > Downstream keyed state not work after FlinkKafkaShuffle > ------------------------------------------------------- > > Key: FLINK-21317 > URL: https://issues.apache.org/jira/browse/FLINK-21317 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.0 > Reporter: Kezhu Wang > Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0 > > > {{FlinkKafkaShuffle}} uses > {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition > records to kafka topic partition. The assignment works as follow: > # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int > maxParallelism)}} assigns key to key group. > # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int > maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to > operator/subtask index. > When kafka topic partitions are consumed, they are redistributed by > {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int > numParallelSubtasks)}}. I copied code of this redistribution here. > {code:java} > public class KafkaTopicPartitionAssigner { > public static int assign(KafkaTopicPartition partition, int > numParallelSubtasks) { > int startIndex = > ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % > numParallelSubtasks; > // here, the assumption is that the id of Kafka partitions are always > ascending > // starting from 0, and therefore can be used directly as the offset > clockwise from the > // start index > return (startIndex + partition.getPartition()) % numParallelSubtasks; > } > } > {code} > This partition redistribution breaks prerequisites for > {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed > up. The consequence is unusable keyed state. I list deepest stack trace > captured here: > {noformat} > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205) > at > org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100) > {noformat} > cc [~ym] [~sewen] [~AHeise] [~pnowojski] > Below is my proposed changes: > * Make assignment between partition and subtask customizable. > * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 > in existing assignment algorithms.) > I saw FLINK-8570, above changes could be helpful if we finally decide to > deliver FLINK-8570. -- This message was sent by Atlassian Jira (v8.20.1#820001)