[ https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863924#comment-17863924 ]
Mason Chen commented on FLINK-33545: ------------------------------------ [~arvid] [~thw] I believe this is solved by https://issues.apache.org/jira/browse/FLINK-35749! > KafkaSink implementation can cause dataloss during broker issue when not > using EXACTLY_ONCE if there's any batching > ------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-33545 > URL: https://issues.apache.org/jira/browse/FLINK-33545 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.18.0 > Reporter: Kevin Tseng > Assignee: Kevin Tseng > Priority: Major > Labels: pull-request-available > > In the current implementation of KafkaSource and KafkaSink there are some > assumption that were made: > # KafkaSource completely relies on Checkpoint to manage and track its offset > in *KafkaSourceReader<T>* class > # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when > *DeliveryGuarantee.EXACTLY_ONCE* is specified. > KafkaSource is assuming that checkpoint should be properly fenced and > everything it had read up-til checkpoint being initiated will be processed or > recorded by operators downstream, including the TwoPhaseCommiter such as > *KafkaSink* > *KafkaSink* goes by the model of: > > {code:java} > flush -> prepareCommit -> commit{code} > > In a scenario that: > * KafkaSource ingested records #1 to #100 > * KafkaSink only had chance to send records #1 to #96 > * with a batching interval of 5ms > when checkpoint has been initiated, flush will only confirm the sending of > record #1 to #96. > This allows checkpoint to proceed as there's no error, and record #97 to 100 > will be batched after first flush. > Now, if broker goes down / has issue that caused the internal KafkaProducer > to not be able to send out the record after a batch, and is on a constant > retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), > *WriterCallback* error handling will never be triggered until the next > checkpoint flush. > This can be tested by creating a faulty Kafka cluster and run the following > code: > {code:java} > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); > props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer"); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > final KafkaProducer<String, String> producer = new KafkaProducer<>(props); > try { > for (int i = 0; i < 10; i++) { > System.out.printf("sending record #%d\n", i); > String data = UUID.randomUUID().toString(); > final ProducerRecord<String, String> record = new > ProducerRecord<>(TOPIC, Integer.toString(i), data); > producer.send(record, new CB(Integer.toString(i), data)); > Thread.sleep(10000); //sleep for 10 seconds > } > } catch (Exception e) { > e.printStackTrace(); > } finally { > System.out.println("flushing"); > producer.flush(); > System.out.println("closing"); > producer.close(); > }{code} > Once callback returns due to network timeout, it will cause Flink to restart > from previously saved checkpoint (which recorded reading up to record #100), > but KafkaWriter never sent record #97 to #100. > This will result in dataloss of record #97 to #100 > Because KafkaWriter only catches error *after* callback, if callback is never > invoked (due to broker issue) right after the first flush has taken place, > those records are effectively gone unless someone decided to go back and look > for it. > This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but > is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}. > There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}. > prepareCommit will produce a list of KafkaCommittable that corresponds to > Transactional KafkaProducer to be committed. And a catch up flush will take > place during *commit* step. Whether this was intentional or not, due to the > fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the > end of EXACTLY_ONCE actually ensured everything fenced in the current > checkpoint will be sent to Kafka, or fail the checkpoint if not successful. > > Due the above finding, I'm recommending one of the following fixes: > # need to perform second flush for AT_LEAST_ONCE > # or move flush to the end of the KafkaSink process. > I'm leaning towards 2nd option as it does not make sense to flush then do > checkpoint, it should be right before checkpoint completes then we flush, > given that's what commit is meant to do. -- This message was sent by Atlassian Jira (v8.20.10#820010)