[ 
https://issues.apache.org/jira/browse/FLINK-35477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35477:
-----------------------------------
    Labels: pull-request-available  (was: )

> Pulsar connector resets existing subscription
> ---------------------------------------------
>
>                 Key: FLINK-35477
>                 URL: https://issues.apache.org/jira/browse/FLINK-35477
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.19.0, 1.18.1
>         Environment: Flink version: 1.18.1
> Pulsar connector version: 4.1.0-1.18
> Pulsar version: 3.1.0
>            Reporter: Igor Basov
>            Priority: Major
>              Labels: pull-request-available
>
>  
> The issue occurs in the following circumstances:
>  * There is an existing subscription in a Pulsar topic, and it has some 
> accumulated backlog;
>  * Flink job is deployed from a clear state (no checkpoints)
>  * Flink job uses the same subscription name as the existing one; the start 
> cursor is the default one (earliest)
>  * {{pulsar.source.resetSubscriptionCursor}} is not set (default: false)
>  
> *Expected behaviour*
> Based on the docs 
> [here|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/pulsar/#starting-position],
>  the priority for setting up the cursor position should be: {{{}checkpoint > 
> existed subscription position > StartCursor{}}}. So, since there are no 
> checkpoints, the job should get the existing position from Pulsar and start 
> reading from there.
>  
> *Observed behaviour*
> As soon as the job is connected to the topic, the number of messages in the 
> subscription backlog jumps to a new high, and JM logs show messages:
> {code:java}
> Seeking subscription to the message -1:-1:-1
> Successfully reset subscription to the message -1:-1:-1 {code}
> Apparently, Flink ignored the existing subscription position and reset its 
> cursor position to the earliest in the topic.
>  
> *Further observations*
> The related code seems to be 
> [here|https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java#L223].
> I believe the breaking changes were introduced in this 
> [commit:|https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#]
>  * The check if the subscription already exists is removed 
> [here|https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-ce7a6c1d29387077c2b19992312c0120bd16580ba5cf9bf222c718dd18a0db2aL86]
>  * The check for {{isResetSubscriptionCursor()}} is removed 
> [here|https://github.com/apache/flink-connector-pulsar/commit/78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858#diff-4db00b10562cef1def73b06f0e2765a650c51954b4cf13487984204495d8a776L231].
> I was able to confirm that it works as expected if I downgrade connector to 
> {{4.0.0-1.17.}}
> This issue will be blocking us from upgrading to Flink 1.18 and later 
> versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to