Victor Babenko created FLINK-38600:
--------------------------------------
Summary: Pulsar connector fails with 'Exclusive consumer is
already connected' due to a race condition
Key: FLINK-38600
URL: https://issues.apache.org/jira/browse/FLINK-38600
Project: Flink
Issue Type: Bug
Components: Connectors / Pulsar
Reporter: Victor Babenko
There is a race condition issue in the Flink Pulsar Connector that occasionally
happens (about 1-2% probability per partition in our setup) because the
connector is [creating a dummy
consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java#L85]
to seek to the right cursor position, closes it and immediately after that
[creates the real
consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java#L239].
It leads to a race condition where the previous consumer is not fully released
on the broker side, and the broker responds with {{Exclusive consumer is
already connected }}, which leads to the job being restarted. In our case we
were subscribing to thousands of topics, so the job would continuously restart
for hours until it reaches an attempt where none of the topics hit this race
condition.
I believe this may be a regression from
[#59|https://github.com/apache/flink-connector-pulsar/pull/59]. The reason we
have to create a separate consumer to seek is described in
[PIP-194|https://github.com/apache/pulsar/issues/16757]. Basically it looks
like there isn't a way to create a consumer with the cursor already set: if we
create it and _then_ call {{{}seek{}}}, some messages may still leak through in
between. Maybe StreamNative knows of another way, but it seems like PIP-194 is
not adopted/implemented so we have to seek before creating the real consumer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)