Victor Babenko created FLINK-38600:
--------------------------------------

             Summary: Pulsar connector fails with 'Exclusive consumer is 
already connected' due to a race condition
                 Key: FLINK-38600
                 URL: https://issues.apache.org/jira/browse/FLINK-38600
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Pulsar
            Reporter: Victor Babenko


There is a race condition issue in the Flink Pulsar Connector that occasionally 
happens (about 1-2% probability per partition in our setup) because the 
connector is [creating a dummy 
consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java#L85]
 to seek to the right cursor position, closes it and immediately after that 
[creates the real 
consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java#L239].
 It leads to a race condition where the previous consumer is not fully released 
on the broker side, and the broker responds with {{Exclusive consumer is 
already connected }}, which leads to the job being restarted. In our case we 
were subscribing to thousands of topics, so the job would continuously restart 
for hours until it reaches an attempt where none of the topics hit this race 
condition.
I believe this may be a regression from 
[#59|https://github.com/apache/flink-connector-pulsar/pull/59]. The reason we 
have to create a separate consumer to seek is described in 
[PIP-194|https://github.com/apache/pulsar/issues/16757]. Basically it looks 
like there isn't a way to create a consumer with the cursor already set: if we 
create it and _then_ call {{{}seek{}}}, some messages may still leak through in 
between. Maybe StreamNative knows of another way, but it seems like PIP-194 is 
not adopted/implemented so we have to seek before creating the real consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to