[ 
https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298692#comment-17298692
 ] 

Yuan Mei edited comment on FLINK-21317 at 3/10/21, 10:48 AM:
-------------------------------------------------------------

Hey [~kezhuw], thanks very much for reporting the bug. 

That's indeed true. I've read the code why the following pre-requisition is 
needed for DataStreamUtils.reinterpretAsKeyedStream

    /* IMPORTANT: For every partition of the base stream, the keys of events in 
the base stream 
    /* must be partitioned exactly in the same way as if it was created through 
a
    /* [[DataStream#keyBy(KeySelector)]].

The reason is that each state backend instance is responsible for a certain 
range of key groups. If the KafkaConsumer side uses a different assignment

`KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions)`

than 

`KeyGroupRangeAssignment.assignKeyToParallelOperator(
        keySelector.getKey(in), numberOfPartitions, numberOfPartitions);`

It will cause fetching states from the state backend that does not contain the 
corresponding key group.

*Back to your question, whether your proposal change make sense?*

Yes, with some suggestions:

1. Make assignment between partition and subtask customizable. 
Yes, I think this is valuable no matter whether we have this bug or not

2. Provide a 0-based round-robin assignment. (This is making startIndex 0 in 
existing assignment algorithms.)
I would suggest using `KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup` 
instead.

round-robin works when consumer subtask == number of partitions; while it does 
not work when consumer task < number of partitions.

Hope this makes sense to you.

Besides that, are you still interested in working on this?







was (Author: ym):
Hey [~kezhuw], thanks very much for reporting the bug. 

That's indeed true. I've read the code why the following pre-requisition is 
needed for DataStreamUtils.reinterpretAsKeyedStream

    /* IMPORTANT: For every partition of the base stream, the keys of events in 
the base stream 
    /* must be partitioned exactly in the same way as if it was created through 
a
    /* [[DataStream#keyBy(KeySelector)]].

The reason is that each state backend instance is responsible for a certain 
range of key groups. If the KafkaConsumer side uses a different assignment

`KafkaTopicPartitionAssigner.assign(partition, numberOfPartitions)`

than 

`KeyGroupRangeAssignment.assignKeyToParallelOperator(
        keySelector.getKey(in), numberOfPartitions, numberOfPartitions);`

It will cause fetching states from the state backend that does not contain the 
corresponding key group.

*Back to your question, whether your proposal change make sense?*

Yes, with some suggestions:

1. Make assignment between partition and subtask customizable. 
Yes, I think this is valuable no matter whether we have this bug or not

2. Provide a 0-based round-robin assignment. (This is making startIndex 0 in 
existing assignment algorithms.)
I would suggest using `KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup` 
instead.

round-robin works when consumer subtask == number of partitions; while it may 
not work when consumer task < number of partitions.

Hope this makes sense to you.

Besides that, are you still interested in working on this?






> Downstream keyed state not work after FlinkKafkaShuffle
> -------------------------------------------------------
>
>                 Key: FLINK-21317
>                 URL: https://issues.apache.org/jira/browse/FLINK-21317
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0
>            Reporter: Kezhu Wang
>            Priority: Major
>
> {{FlinkKafkaShuffle}} uses 
> {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition 
> records to kafka topic partition. The assignment works as follow:
>  # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int 
> maxParallelism)}} assigns key to key group.
>  # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int 
> maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to 
> operator/subtask index.
> When kafka topic partitions are consumed, they are redistributed by 
> {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int 
> numParallelSubtasks)}}. I copied code of this redistribution here.
> {code:java}
> public class KafkaTopicPartitionAssigner {
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         int startIndex =
>                 ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % 
> numParallelSubtasks;
>         // here, the assumption is that the id of Kafka partitions are always 
> ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition.getPartition()) % numParallelSubtasks;
>     }
> }
> {code}
> This partition redistribution breaks prerequisites for 
> {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed 
> up. The consequence is unusable keyed state. I list deepest stack trace 
> captured here:
> {noformat}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
>       at 
> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
> {noformat}
> cc [~ym]  [~sewen] [~AHeise]  [~pnowojski]
> Below is my proposed changes:
> * Make assignment between partition and subtask customizable.
> * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 
> in existing assignment algorithms.)
> I saw FLINK-8570, above changes could be helpful if we finally decide to 
> deliver FLINK-8570.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to