中国无锡周良 created FLINK-35564:
------------------------------
Summary: The topic cannot be distributed on subtask when
calculatePartitionOwner returns -1
Key: FLINK-35564
URL: https://issues.apache.org/jira/browse/FLINK-35564
Project: Flink
Issue Type: Bug
Components: Connectors / Pulsar
Affects Versions: 1.17.2
Reporter: 中国无锡周良
The topic cannot be distributed on subtask when calculatePartitionOwner returns
-1
{code:java}
@VisibleForTesting
static int calculatePartitionOwner(String topic, int partitionId, int
parallelism) {
int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
/*
* Here, the assumption is that the id of Pulsar partitions are always
ascending starting from
* 0. Therefore, can be used directly as the offset clockwise from the
start index.
*/
return (startIndex + partitionId) % parallelism;
} {code}
Here startIndex is a non-negative number calculated based on topic.hashCode()
and in the range [0, parallelism-1].
For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;
but
{code:java}
@Override
public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
List<Integer> readers) {
if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
return Optional.empty();
}
Map<Integer, List<PulsarPartitionSplit>> assignMap =
new HashMap<>(pendingPartitionSplits.size());
for (Integer reader : readers) {
Set<PulsarPartitionSplit> splits =
pendingPartitionSplits.remove(reader);
if (splits != null && !splits.isEmpty()) {
assignMap.put(reader, new ArrayList<>(splits));
}
}
if (assignMap.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new SplitsAssignment<>(assignMap));
}
} {code}
pendingPartitionSplits can't possibly have a value of -1, right? The
calculation method of the topic by the above return 1, pendingPartitionSplits.
Remove (reader), forever is null; This topic will not be assigned to a subtask;
And I simulated this topic locally and found that messages were indeed not
processed;
--
This message was sent by Atlassian Jira
(v8.20.10#820010)