[ 
https://issues.apache.org/jira/browse/FLINK-20753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254082#comment-17254082
 ] 

Nazar Volynets commented on FLINK-20753:
----------------------------------------

It is not vailed issue. NOT REPRODUCABLE (after correction). Sorry.

Basically the root cause is here (from issue description provided above):
{code:bash|title=kafka-target-1 Consumer - 0 partition}
bash -c 'echo Consuming data for topic-1... && \
      kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic 
topic-1 --partition 0 --from-beginning --property print.key=true --property 
key.separator=":" --value-deserializer 
org.apache.kafka.common.serialization.StringDeserializer' --isolation-level 
read_committed
{code}
-->
are ignored
{code:bash|title=kafka-target-1 Consumer - 0 partition}
...' --isolation-level read_committed
{code}
As result, Kafka Consumer is NOT +transactional+.

+Summary+

Issue is not vailed. After correction Kafka Consumer
{code:bash|title=kafka-target-1 Consumer - 0 partition}
bash -c 'echo Consuming data for topic-1... && \
      kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic 
topic-1 --partition 0 --isolation-level read_committed --from-beginning 
--property print.key=true --property key.separator=":" --value-deserializer 
org.apache.kafka.common.serialization.StringDeserializer'
{code}
to be really +transactional+  --> not able to reproduce it.

> Duplicates With Exactly-once Kafka -> Kakfa Producer
> ----------------------------------------------------
>
>                 Key: FLINK-20753
>                 URL: https://issues.apache.org/jira/browse/FLINK-20753
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Checkpointing
>    Affects Versions: 1.12.0
>         Environment: Java 11
> Flink stated within IDE
>            Reporter: Nazar Volynets
>            Priority: Major
>
> *Introduction*
> Based on as follows statements from Flink's docs:
> 1. 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html]
> {quote}Flink provides an [Apache Kafka|https://kafka.apache.org/] connector 
> for reading data from and writing data to Kafka topics with exactly-once 
> guarantees.
> {quote}
> 2. 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end]
> {quote}To achieve exactly once end-to-end, so that every event from the 
> sources affects the sinks exactly once, the following must be true:
>  # your sources must be replayable, and
>  # your sinks must be transactional (or idempotent){quote}
> 3. 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats]
> {quote}{{Semantic.EXACTLY_ONCE}} mode relies on the ability to commit 
> transactions that were started before taking a checkpoint, after recovering 
> from the said checkpoint. If the time between Flink application crash and 
> completed restart is larger than Kafka's transaction timeout there will be 
> data loss (Kafka will automatically abort transactions that exceeded timeout 
> time)
> {quote}
> 4. [https://issues.apache.org/jira/browse/FLINK-7210]
> There is references/mentions about two-phase commit mechanic used in old 
> Flink Kafka connector. So it is expected that latest one version of connector 
> has the same functionality.
> it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery 
> guarantees.
> Moreover it is emphasised to tune Kafka cluster transaction timeout (make it 
> from 15 mins to 1 hour) to omit data loss.
> Moving forward, all these three statements are met by `Kafka Source` -> 
> `Kafka Sink` app:
>  * regarding first-one -> you are reading from & to Kafka
>  * about second-one -> `Kafka Source` is replayable & `Kafka Sink` is 
> transactional
>  * last one -> `Kafka Sink` is transactional & consequently in case of 
> EXACTLY_ONCE this operator has a state; so it expected that transaction will 
> be rolled back.
> But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink 
> `Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result 
> EXACTLY_ONCE semantics is violated.
> *Details*
> +STRs:+
>  # Create simple Flink's `Kafka Source` -> `Kafka Sink` app
>  ## Stream execution env:
>  ### Parallelism -> 1
>  ### Enable checkpointing -> 10000 ms (do it so big intentionally)
>  ### State backend -> RocksDB
>  ### Checkpointing mode -> EXACTLY_ONCE
>  ### Min pause between checkpoints -> 500 ms
>  ### Max concurrent checkpoints -> 1
>  ## Flink Kafka consumer
>  ### Nothing valuable
>  ## Flink Kafka producer
>  ### Props:
>  #### ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"
>  #### ProducerConfig.ACKS_CONFIG, "all"
>  #### ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"
>  ### EXACTLY_ONCE Semantic
>  # Deploy `Kafka Source` Cluster
>  ## Cretae `topic-1` with 3 patitions
>  # Deploy `Kafka Sink` Cluster
>  ## Cretae `topic-1` with 3 patitions
>  # Spin up some Kafka client to generate data into `Kafka Source`:`topic-1` 
> (e.g. Confluent `kafka-console-producer`)
>  # Spin up +transactional+ Kafka consumer to drain data from `Kafka 
> Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`)
>  # Use Flink's app described in step #1 to ship data from `Kafka Source` -> 
> `Kafka Sink` Kafka cluster.
>  # Wait until Flink app will create a first checkpoint. 
>  # Brutally kill Flink's app (SIGKILL)
>  # Wait 10 secs
>  # Start Flink app again.
>  # Check on duplications in +transactional+ Kafka consumer (described in step 
> #5)
> +Actual+
> Duplication are exist in +transactional+ Kafka consumer output.
> +Expected+
>  * Kafka transaction should be rolled back by Flink Kafka producer with 
> EXACTLY_ONCE Semantic
>  * Flink should automatically replay the data from `Kafka Source` based on 
> offsets persisted in latest checkpoint
> *Example*
> +App+
> {code:java|title=build.gradle (dependencies)}
> ...
> ext {
>   ...
>   javaVersion = '11'
>   flinkVersion = '1.12.0'
>   scalaBinaryVersion = '2.11'
>   ...
> }
> dependencies {
>   ...
>   implementation 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>   implementation 
> "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
>   implementation 
> "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
>   ...
> }
> {code}
> {code:java|title=App}
> public static void main(String[] args) {
>   ...
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1); // to make things simple
>   env.enableCheckpointing(10000); // intentionally specified 10 secs to have 
> a room to stop app between checkpoints
>   env.setStateBackend(new 
> RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
>   
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
>   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>   env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>   FlinkKafkaConsumer<Record> consumer = createConsumer();
>   FlinkKafkaProducer<Record> producer = createProducer();
>   env
>     .addSource(consumer)
>     .uid("kafka-consumer")
>     .addSink(producer)
>     .uid("kafka-producer")
>   ;
>   env.execute();
> }
> public static FlinkKafkaConsumer<Record> createConsumer() {
>   ...
>   Properties props = new Properties();
>   props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "kafka-source-1:9091");
>   ... // nothing special
>   props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
>   FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", 
> new RecordKafkaDerSchema(), props);
>   ... // RecordKafkaDerSchema --> custom schema is used to copy not only 
> message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   consumer.setStartFromGroupOffsets();
>   consumer.setCommitOffsetsOnCheckpoints(true);
>   return consumer;
> }
> public static FlinkKafkaProducer<Record> createProducer() {
>   ...
>   Properties props = new Properties();
>   ...
>   props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "kafka-target-1:9094");
>   props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
> "1");
>   props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
>   props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
>   props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
>   props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
>   props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
>   props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>   props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // 
> ignored due to expected behaviour - 
> https://issues.apache.org/jira/browse/FLINK-17691
>   props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 
> * 1000)); // decreased from 1 hour to 15 mins; app is going to be shutdown 
> less than 15 mins
>   ...
>   FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", 
> new RecordKafkaSerSchema(true), props, 
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>   ... // RecordKafkaSerSchema --> custom schema is used to copy not only 
> message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   return producer;
> }
> {code}
> {code:bash|title=kafka-source-1 Producer}
> bash -c 'echo Producing data... && \
>  for ((i=0; ;++i)); do echo "t1-k-$$i:t1-v-$$i"; sleep 2; done | 
> kafka-console-producer --request-required-acks 1 --broker-list 
> kafka-source-1:9091 --topic topic-1 --property parse.key=true --property 
> key.separator=":"'
> {code}
> {code:bash|title=kafka-target-1 Consumer - 0 partition}
> bash -c 'echo Consuming data for topic-1... && \
>       kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic 
> topic-1 --partition 0 --from-beginning --property print.key=true --property 
> key.separator=":" --value-deserializer 
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level 
> read_committed
> {code}
> {code:bash|title=kafka-target-1 Consumer - 1 partition}
> bash -c 'echo Consuming data for topic-1... && \
>       kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic 
> topic-1 --partition 1 --from-beginning --property print.key=true --property 
> key.separator=":" --value-deserializer 
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level 
> read_committed
> {code}
> {code:bash|title=kafka-target-1 Consumer - 2 partition}
> bash -c 'echo Consuming data for topic-1... && \
>       kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic 
> topic-1 --partition 2 --from-beginning --property print.key=true --property 
> key.separator=":" --value-deserializer 
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level 
> read_committed
> {code}
> +Output+
> {code:java|title=kafka-target-1 Consumer - 0 partition}
> ...
> t1-k-40:t1-v-40
> t1-k-43:t1-v-43
> t1-k-44:t1-v-44
> t1-k-47:t1-v-47
> t1-k-48:t1-v-48
> t1-k-49:t1-v-49
> t1-k-48:t1-v-48 // DUPLICATION!!! --> EXACTLY ONCE is violated
> t1-k-49:t1-v-49 // DUPLICATION!!! --> EXACTLY ONCE is violated
> t1-k-54:t1-v-54
> t1-k-61:t1-v-61
> t1-k-62:t1-v-62
> t1-k-66:t1-v-66
> t1-k-71:t1-v-71
> t1-k-73:t1-v-73
> ...
> {code}
> {code:java|title=kafka-target-1 Consumer - 1 partition}
> ...
> t1-k-35:t1-v-35
> t1-k-46:t1-v-46
> t1-k-50:t1-v-50
> t1-k-51:t1-v-51
> t1-k-53:t1-v-53
> t1-k-56:t1-v-56
> t1-k-57:t1-v-57
> t1-k-59:t1-v-59
> t1-k-60:t1-v-60
> t1-k-63:t1-v-63
> t1-k-65:t1-v-65
> t1-k-69:t1-v-69
> t1-k-74:t1-v-74
> ...
> {code}
> {code:java|title=kafka-target-1 Consumer - 2 partition}
> ...
> t1-k-39:t1-v-39
> t1-k-41:t1-v-41
> t1-k-42:t1-v-42
> t1-k-45:t1-v-45
> t1-k-52:t1-v-52
> t1-k-55:t1-v-55
> t1-k-58:t1-v-58
> t1-k-64:t1-v-64
> t1-k-67:t1-v-67
> t1-k-68:t1-v-68
> t1-k-70:t1-v-70
> t1-k-72:t1-v-72
> t1-k-75:t1-v-75
> t1-k-77:t1-v-77
> ...
> {code}
> +Summary+
> As we can see from `kafka-target-1 Consumer - 0 partition` EXACTLY ONCE 
> delivery has been violated.
> P.S.: If I have missed something. Please let me know what & how achieve 
> EXACTLY ONCE delivery in native way (via Flink configuration) for this 
> particular simple application described above.
> P.S.: If it is not possible to do in native way (only manual/custom 
> implementation) then please let me know.
> P.S.: Similar issue discussions:
> * 
> [https://stackoverflow.com/questions/57308590/exactly-once-semantics-in-flink-kafka-producer]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to