Rafał Trójczak created FLINK-34414: --------------------------------------
Summary: EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector Key: FLINK-34414 URL: https://issues.apache.org/jira/browse/FLINK-34414 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.2 Reporter: Rafał Trójczak Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version 1.17.2) when there is an exception thrown within the job, the job gets restarted, starts from the last checkpoint, but the sink writes to the output more events than it should, even though the EXACT_ONCE guarantees are set everywhere. To be more specific, here is my Job's flow: * a Pulsar source that reads from the input topic, * a simple processing function, * and a sink that writes to the output topic. Here is a fragment of the source creation: {code:java} .setDeserializationSchema(Schema.AVRO(inClass), inClass) .setSubscriptionName(subscription) .enableSchemaEvolution() .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true) .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1) .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, false); {code} Here is the fragment of the sink creation: {code:java} .setSerializationSchema(Schema.AVRO(outClass), outClass) .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, DeliveryGuarantee.EXACTLY_ONCE) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE); {code} And here is the Flink environment preparation: {code:java} environment.setRuntimeMode(RuntimeExecutionMode.STREAMING); environment.enableCheckpointing(CHECKPOINTING_INTERVAL, CheckpointingMode.EXACTLY_ONCE); {code} After sending 1000 events on the input topic, on the output topic I got 1048 events. I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator. Here is the MRE for this problem (mind that there is an internal dependency, but it may be commented out together with the code that relies on it): [https://github.com/trojczak/flink-pulsar-connector-problem] -- This message was sent by Atlassian Jira (v8.20.10#820010)