[ https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298692#comment-17298692 ]
Yuan Mei edited comment on FLINK-21317 at 3/10/21, 10:48 AM: ------------------------------------------------------------- Hey [~kezhuw], thanks very much for reporting the bug. That's indeed true. I've read the code why the following pre-requisition is needed for DataStreamUtils.reinterpretAsKeyedStream /* IMPORTANT: For every partition of the base stream, the keys of events in the base stream /* must be partitioned exactly in the same way as if it was created through a /* [[DataStream#keyBy(KeySelector)]]. The reason is that each state backend instance is responsible for a certain range of key groups. If the KafkaConsumer side uses a different assignment `KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions)` than `KeyGroupRangeAssignment.assignKeyToParallelOperator( keySelector.getKey(in), numberOfPartitions, numberOfPartitions);` It will cause fetching states from the state backend that does not contain the corresponding key group. *Back to your question, whether your proposal change make sense?* Yes, with some suggestions: 1. Make assignment between partition and subtask customizable. Yes, I think this is valuable no matter whether we have this bug or not 2. Provide a 0-based round-robin assignment. (This is making startIndex 0 in existing assignment algorithms.) I would suggest using `KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup` instead. round-robin works when consumer subtask == number of partitions; while it does not work when consumer task < number of partitions. Hope this makes sense to you. Besides that, are you still interested in working on this? was (Author: ym): Hey [~kezhuw], thanks very much for reporting the bug. That's indeed true. I've read the code why the following pre-requisition is needed for DataStreamUtils.reinterpretAsKeyedStream /* IMPORTANT: For every partition of the base stream, the keys of events in the base stream /* must be partitioned exactly in the same way as if it was created through a /* [[DataStream#keyBy(KeySelector)]]. The reason is that each state backend instance is responsible for a certain range of key groups. If the KafkaConsumer side uses a different assignment `KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions)` than `KeyGroupRangeAssignment.assignKeyToParallelOperator( keySelector.getKey(in), numberOfPartitions, numberOfPartitions);` It will cause fetching states from the state backend that does not contain the corresponding key group. *Back to your question, whether your proposal change make sense?* Yes, with some suggestions: 1. Make assignment between partition and subtask customizable. Yes, I think this is valuable no matter whether we have this bug or not 2. Provide a 0-based round-robin assignment. (This is making startIndex 0 in existing assignment algorithms.) I would suggest using `KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup` instead. round-robin works when consumer subtask == number of partitions; while it may not work when consumer task < number of partitions. Hope this makes sense to you. Besides that, are you still interested in working on this? > Downstream keyed state not work after FlinkKafkaShuffle > ------------------------------------------------------- > > Key: FLINK-21317 > URL: https://issues.apache.org/jira/browse/FLINK-21317 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.13.0 > Reporter: Kezhu Wang > Priority: Major > > {{FlinkKafkaShuffle}} uses > {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition > records to kafka topic partition. The assignment works as follow: > # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int > maxParallelism)}} assigns key to key group. > # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int > maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to > operator/subtask index. > When kafka topic partitions are consumed, they are redistributed by > {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int > numParallelSubtasks)}}. I copied code of this redistribution here. > {code:java} > public class KafkaTopicPartitionAssigner { > public static int assign(KafkaTopicPartition partition, int > numParallelSubtasks) { > int startIndex = > ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % > numParallelSubtasks; > // here, the assumption is that the id of Kafka partitions are always > ascending > // starting from 0, and therefore can be used directly as the offset > clockwise from the > // start index > return (startIndex + partition.getPartition()) % numParallelSubtasks; > } > } > {code} > This partition redistribution breaks prerequisites for > {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed > up. The consequence is unusable keyed state. I list deepest stack trace > captured here: > {noformat} > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205) > at > org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100) > {noformat} > cc [~ym] [~sewen] [~AHeise] [~pnowojski] > Below is my proposed changes: > * Make assignment between partition and subtask customizable. > * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 > in existing assignment algorithms.) > I saw FLINK-8570, above changes could be helpful if we finally decide to > deliver FLINK-8570. -- This message was sent by Atlassian Jira (v8.3.4#803005)