[ 
https://issues.apache.org/jira/browse/FLINK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854929#comment-17854929
 ] 

Yufan Sheng commented on FLINK-35564:
-------------------------------------

Hi, thanks for mention this. I think this bug has been fixed in the latest main 
branch. But we may never backport to the 1.17 branch, I think you can upgrade 
to the latest connector for fixing this issue.

https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java#L173

> The topic cannot be distributed on subtask when calculatePartitionOwner 
> returns -1
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-35564
>                 URL: https://issues.apache.org/jira/browse/FLINK-35564
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.17.2
>            Reporter: 中国无锡周良
>            Priority: Major
>
> The topic cannot be distributed on subtask when calculatePartitionOwner 
> returns -1
> {code:java}
> @VisibleForTesting
> static int calculatePartitionOwner(String topic, int partitionId, int 
> parallelism) {
>     int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
>     /*
>      * Here, the assumption is that the id of Pulsar partitions are always 
> ascending starting from
>      * 0. Therefore, can be used directly as the offset clockwise from the 
> start index.
>      */
>     return (startIndex + partitionId) % parallelism;
> } {code}
> Here startIndex is a non-negative number calculated based on topic.hashCode() 
> and in the range [0, parallelism-1].
> For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;
> but
> {code:java}
> @Override
> public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
>         List<Integer> readers) {
>     if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
>         return Optional.empty();
>     }
>     Map<Integer, List<PulsarPartitionSplit>> assignMap =
>             new HashMap<>(pendingPartitionSplits.size());
>     for (Integer reader : readers) {
>         Set<PulsarPartitionSplit> splits = 
> pendingPartitionSplits.remove(reader);
>         if (splits != null && !splits.isEmpty()) {
>             assignMap.put(reader, new ArrayList<>(splits));
>         }
>     }
>     if (assignMap.isEmpty()) {
>         return Optional.empty();
>     } else {
>         return Optional.of(new SplitsAssignment<>(assignMap));
>     }
> } {code}
> pendingPartitionSplits can't possibly have a value of -1, right? The 
> calculation method of the topic by the above return 1, 
> pendingPartitionSplits. Remove (reader), forever is null; This topic will not 
> be assigned to a subtask; And I simulated this topic locally and found that 
> messages were indeed not processed;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to