中国无锡周良 created FLINK-35564: ------------------------------ Summary: The topic cannot be distributed on subtask when calculatePartitionOwner returns -1 Key: FLINK-35564 URL: https://issues.apache.org/jira/browse/FLINK-35564 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.2 Reporter: 中国无锡周良
The topic cannot be distributed on subtask when calculatePartitionOwner returns -1 {code:java} @VisibleForTesting static int calculatePartitionOwner(String topic, int partitionId, int parallelism) { int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism; /* * Here, the assumption is that the id of Pulsar partitions are always ascending starting from * 0. Therefore, can be used directly as the offset clockwise from the start index. */ return (startIndex + partitionId) % parallelism; } {code} Here startIndex is a non-negative number calculated based on topic.hashCode() and in the range [0, parallelism-1]. For non-partitioned topic. partitionId is NON_PARTITION_ID = -1; but {code:java} @Override public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment( List<Integer> readers) { if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { return Optional.empty(); } Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>(pendingPartitionSplits.size()); for (Integer reader : readers) { Set<PulsarPartitionSplit> splits = pendingPartitionSplits.remove(reader); if (splits != null && !splits.isEmpty()) { assignMap.put(reader, new ArrayList<>(splits)); } } if (assignMap.isEmpty()) { return Optional.empty(); } else { return Optional.of(new SplitsAssignment<>(assignMap)); } } {code} pendingPartitionSplits can't possibly have a value of -1, right? The calculation method of the topic by the above return 1, pendingPartitionSplits. Remove (reader), forever is null; This topic will not be assigned to a subtask; And I simulated this topic locally and found that messages were indeed not processed; -- This message was sent by Atlassian Jira (v8.20.10#820010)