[ https://issues.apache.org/jira/browse/FLINK-35564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854929#comment-17854929 ]
Yufan Sheng commented on FLINK-35564: ------------------------------------- Hi, thanks for mention this. I think this bug has been fixed in the latest main branch. But we may never backport to the 1.17 branch, I think you can upgrade to the latest connector for fixing this issue. https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java#L173 > The topic cannot be distributed on subtask when calculatePartitionOwner > returns -1 > ---------------------------------------------------------------------------------- > > Key: FLINK-35564 > URL: https://issues.apache.org/jira/browse/FLINK-35564 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar > Affects Versions: 1.17.2 > Reporter: 中国无锡周良 > Priority: Major > > The topic cannot be distributed on subtask when calculatePartitionOwner > returns -1 > {code:java} > @VisibleForTesting > static int calculatePartitionOwner(String topic, int partitionId, int > parallelism) { > int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism; > /* > * Here, the assumption is that the id of Pulsar partitions are always > ascending starting from > * 0. Therefore, can be used directly as the offset clockwise from the > start index. > */ > return (startIndex + partitionId) % parallelism; > } {code} > Here startIndex is a non-negative number calculated based on topic.hashCode() > and in the range [0, parallelism-1]. > For non-partitioned topic. partitionId is NON_PARTITION_ID = -1; > but > {code:java} > @Override > public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment( > List<Integer> readers) { > if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { > return Optional.empty(); > } > Map<Integer, List<PulsarPartitionSplit>> assignMap = > new HashMap<>(pendingPartitionSplits.size()); > for (Integer reader : readers) { > Set<PulsarPartitionSplit> splits = > pendingPartitionSplits.remove(reader); > if (splits != null && !splits.isEmpty()) { > assignMap.put(reader, new ArrayList<>(splits)); > } > } > if (assignMap.isEmpty()) { > return Optional.empty(); > } else { > return Optional.of(new SplitsAssignment<>(assignMap)); > } > } {code} > pendingPartitionSplits can't possibly have a value of -1, right? The > calculation method of the topic by the above return 1, > pendingPartitionSplits. Remove (reader), forever is null; This topic will not > be assigned to a subtask; And I simulated this topic locally and found that > messages were indeed not processed; -- This message was sent by Atlassian Jira (v8.20.10#820010)