[ https://issues.apache.org/jira/browse/FLINK-20753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254082#comment-17254082 ]
Nazar Volynets commented on FLINK-20753: ---------------------------------------- It is not vailed issue. NOT REPRODUCABLE (after correction). Sorry. Basically the root cause is here (from issue description provided above): {code:bash|title=kafka-target-1 Consumer - 0 partition} bash -c 'echo Consuming data for topic-1... && \ kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic topic-1 --partition 0 --from-beginning --property print.key=true --property key.separator=":" --value-deserializer org.apache.kafka.common.serialization.StringDeserializer' --isolation-level read_committed {code} --> are ignored {code:bash|title=kafka-target-1 Consumer - 0 partition} ...' --isolation-level read_committed {code} As result, Kafka Consumer is NOT +transactional+. +Summary+ Issue is not vailed. After correction Kafka Consumer {code:bash|title=kafka-target-1 Consumer - 0 partition} bash -c 'echo Consuming data for topic-1... && \ kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic topic-1 --partition 0 --isolation-level read_committed --from-beginning --property print.key=true --property key.separator=":" --value-deserializer org.apache.kafka.common.serialization.StringDeserializer' {code} to be really +transactional+ --> not able to reproduce it. > Duplicates With Exactly-once Kafka -> Kakfa Producer > ---------------------------------------------------- > > Key: FLINK-20753 > URL: https://issues.apache.org/jira/browse/FLINK-20753 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Runtime / Checkpointing > Affects Versions: 1.12.0 > Environment: Java 11 > Flink stated within IDE > Reporter: Nazar Volynets > Priority: Major > > *Introduction* > Based on as follows statements from Flink's docs: > 1. > [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html] > {quote}Flink provides an [Apache Kafka|https://kafka.apache.org/] connector > for reading data from and writing data to Kafka topics with exactly-once > guarantees. > {quote} > 2. > [https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end] > {quote}To achieve exactly once end-to-end, so that every event from the > sources affects the sinks exactly once, the following must be true: > # your sources must be replayable, and > # your sinks must be transactional (or idempotent){quote} > 3. > [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats] > {quote}{{Semantic.EXACTLY_ONCE}} mode relies on the ability to commit > transactions that were started before taking a checkpoint, after recovering > from the said checkpoint. If the time between Flink application crash and > completed restart is larger than Kafka's transaction timeout there will be > data loss (Kafka will automatically abort transactions that exceeded timeout > time) > {quote} > 4. [https://issues.apache.org/jira/browse/FLINK-7210] > There is references/mentions about two-phase commit mechanic used in old > Flink Kafka connector. So it is expected that latest one version of connector > has the same functionality. > it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery > guarantees. > Moreover it is emphasised to tune Kafka cluster transaction timeout (make it > from 15 mins to 1 hour) to omit data loss. > Moving forward, all these three statements are met by `Kafka Source` -> > `Kafka Sink` app: > * regarding first-one -> you are reading from & to Kafka > * about second-one -> `Kafka Source` is replayable & `Kafka Sink` is > transactional > * last one -> `Kafka Sink` is transactional & consequently in case of > EXACTLY_ONCE this operator has a state; so it expected that transaction will > be rolled back. > But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink > `Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result > EXACTLY_ONCE semantics is violated. > *Details* > +STRs:+ > # Create simple Flink's `Kafka Source` -> `Kafka Sink` app > ## Stream execution env: > ### Parallelism -> 1 > ### Enable checkpointing -> 10000 ms (do it so big intentionally) > ### State backend -> RocksDB > ### Checkpointing mode -> EXACTLY_ONCE > ### Min pause between checkpoints -> 500 ms > ### Max concurrent checkpoints -> 1 > ## Flink Kafka consumer > ### Nothing valuable > ## Flink Kafka producer > ### Props: > #### ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true" > #### ProducerConfig.ACKS_CONFIG, "all" > #### ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1" > ### EXACTLY_ONCE Semantic > # Deploy `Kafka Source` Cluster > ## Cretae `topic-1` with 3 patitions > # Deploy `Kafka Sink` Cluster > ## Cretae `topic-1` with 3 patitions > # Spin up some Kafka client to generate data into `Kafka Source`:`topic-1` > (e.g. Confluent `kafka-console-producer`) > # Spin up +transactional+ Kafka consumer to drain data from `Kafka > Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`) > # Use Flink's app described in step #1 to ship data from `Kafka Source` -> > `Kafka Sink` Kafka cluster. > # Wait until Flink app will create a first checkpoint. > # Brutally kill Flink's app (SIGKILL) > # Wait 10 secs > # Start Flink app again. > # Check on duplications in +transactional+ Kafka consumer (described in step > #5) > +Actual+ > Duplication are exist in +transactional+ Kafka consumer output. > +Expected+ > * Kafka transaction should be rolled back by Flink Kafka producer with > EXACTLY_ONCE Semantic > * Flink should automatically replay the data from `Kafka Source` based on > offsets persisted in latest checkpoint > *Example* > +App+ > {code:java|title=build.gradle (dependencies)} > ... > ext { > ... > javaVersion = '11' > flinkVersion = '1.12.0' > scalaBinaryVersion = '2.11' > ... > } > dependencies { > ... > implementation > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > implementation > "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}" > implementation > "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}" > ... > } > {code} > {code:java|title=App} > public static void main(String[] args) { > ... > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); // to make things simple > env.enableCheckpointing(10000); // intentionally specified 10 secs to have > a room to stop app between checkpoints > env.setStateBackend(new > RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true)); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); > FlinkKafkaConsumer<Record> consumer = createConsumer(); > FlinkKafkaProducer<Record> producer = createProducer(); > env > .addSource(consumer) > .uid("kafka-consumer") > .addSink(producer) > .uid("kafka-producer") > ; > env.execute(); > } > public static FlinkKafkaConsumer<Record> createConsumer() { > ... > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "kafka-source-1:9091"); > ... // nothing special > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", > new RecordKafkaDerSchema(), props); > ... // RecordKafkaDerSchema --> custom schema is used to copy not only > message body but message key too > ... // SimpleStringSchema --> can be used instead to reproduce issue > consumer.setStartFromGroupOffsets(); > consumer.setCommitOffsetsOnCheckpoints(true); > return consumer; > } > public static FlinkKafkaProducer<Record> createProducer() { > ... > Properties props = new Properties(); > ... > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "kafka-target-1:9094"); > props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); > props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); > props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); > props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); > props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000"); > props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // > ignored due to expected behaviour - > https://issues.apache.org/jira/browse/FLINK-17691 > props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 > * 1000)); // decreased from 1 hour to 15 mins; app is going to be shutdown > less than 15 mins > ... > FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", > new RecordKafkaSerSchema(true), props, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > ... // RecordKafkaSerSchema --> custom schema is used to copy not only > message body but message key too > ... // SimpleStringSchema --> can be used instead to reproduce issue > return producer; > } > {code} > {code:bash|title=kafka-source-1 Producer} > bash -c 'echo Producing data... && \ > for ((i=0; ;++i)); do echo "t1-k-$$i:t1-v-$$i"; sleep 2; done | > kafka-console-producer --request-required-acks 1 --broker-list > kafka-source-1:9091 --topic topic-1 --property parse.key=true --property > key.separator=":"' > {code} > {code:bash|title=kafka-target-1 Consumer - 0 partition} > bash -c 'echo Consuming data for topic-1... && \ > kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic > topic-1 --partition 0 --from-beginning --property print.key=true --property > key.separator=":" --value-deserializer > org.apache.kafka.common.serialization.StringDeserializer' --isolation-level > read_committed > {code} > {code:bash|title=kafka-target-1 Consumer - 1 partition} > bash -c 'echo Consuming data for topic-1... && \ > kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic > topic-1 --partition 1 --from-beginning --property print.key=true --property > key.separator=":" --value-deserializer > org.apache.kafka.common.serialization.StringDeserializer' --isolation-level > read_committed > {code} > {code:bash|title=kafka-target-1 Consumer - 2 partition} > bash -c 'echo Consuming data for topic-1... && \ > kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic > topic-1 --partition 2 --from-beginning --property print.key=true --property > key.separator=":" --value-deserializer > org.apache.kafka.common.serialization.StringDeserializer' --isolation-level > read_committed > {code} > +Output+ > {code:java|title=kafka-target-1 Consumer - 0 partition} > ... > t1-k-40:t1-v-40 > t1-k-43:t1-v-43 > t1-k-44:t1-v-44 > t1-k-47:t1-v-47 > t1-k-48:t1-v-48 > t1-k-49:t1-v-49 > t1-k-48:t1-v-48 // DUPLICATION!!! --> EXACTLY ONCE is violated > t1-k-49:t1-v-49 // DUPLICATION!!! --> EXACTLY ONCE is violated > t1-k-54:t1-v-54 > t1-k-61:t1-v-61 > t1-k-62:t1-v-62 > t1-k-66:t1-v-66 > t1-k-71:t1-v-71 > t1-k-73:t1-v-73 > ... > {code} > {code:java|title=kafka-target-1 Consumer - 1 partition} > ... > t1-k-35:t1-v-35 > t1-k-46:t1-v-46 > t1-k-50:t1-v-50 > t1-k-51:t1-v-51 > t1-k-53:t1-v-53 > t1-k-56:t1-v-56 > t1-k-57:t1-v-57 > t1-k-59:t1-v-59 > t1-k-60:t1-v-60 > t1-k-63:t1-v-63 > t1-k-65:t1-v-65 > t1-k-69:t1-v-69 > t1-k-74:t1-v-74 > ... > {code} > {code:java|title=kafka-target-1 Consumer - 2 partition} > ... > t1-k-39:t1-v-39 > t1-k-41:t1-v-41 > t1-k-42:t1-v-42 > t1-k-45:t1-v-45 > t1-k-52:t1-v-52 > t1-k-55:t1-v-55 > t1-k-58:t1-v-58 > t1-k-64:t1-v-64 > t1-k-67:t1-v-67 > t1-k-68:t1-v-68 > t1-k-70:t1-v-70 > t1-k-72:t1-v-72 > t1-k-75:t1-v-75 > t1-k-77:t1-v-77 > ... > {code} > +Summary+ > As we can see from `kafka-target-1 Consumer - 0 partition` EXACTLY ONCE > delivery has been violated. > P.S.: If I have missed something. Please let me know what & how achieve > EXACTLY ONCE delivery in native way (via Flink configuration) for this > particular simple application described above. > P.S.: If it is not possible to do in native way (only manual/custom > implementation) then please let me know. > P.S.: Similar issue discussions: > * > [https://stackoverflow.com/questions/57308590/exactly-once-semantics-in-flink-kafka-producer] -- This message was sent by Atlassian Jira (v8.3.4#803005)