Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134399156 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer. + * + * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is + * expected to call: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between + * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume + * interrupted transaction and commit if after a restart: + * + * <ul> + * <li>{@link FlinkKafkaProducer#initTransactions()}</li> + * <li>{@link FlinkKafkaProducer#beginTransaction()}</li> + * <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li> + * <li>{@link FlinkKafkaProducer#flush()}</li> + * <li>{@link FlinkKafkaProducer#getProducerId()}</li> + * <li>{@link FlinkKafkaProducer#getEpoch()}</li> + * <li>node failure... restore producerId and epoch from state</li> + * <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li> + * <li>{@link FlinkKafkaProducer#commitTransaction()}</li> + * </ul> + * + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()} + * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions. + * + * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer} + * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on + * {@link FlinkKafkaProducer#commitTransaction()}. + * + * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via + * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately + * private fields). + * + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's + * API authors to make them possible. + * + * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements + * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to + * re-implement whole Kafka's 0.11 REST API client on our own. + */ +public class FlinkKafkaProducer<K, V> implements Producer<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); + + private final KafkaProducer<K, V> kafkaProducer; + @Nullable + private final String transactionalId; + + public FlinkKafkaProducer(Properties properties) { + transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + kafkaProducer = new KafkaProducer<>(properties); + } + + // -------------------------------- Simple proxy method calls -------------------------------- + + @Override + public void initTransactions() { + kafkaProducer.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + kafkaProducer.beginTransaction(); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + kafkaProducer.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + kafkaProducer.abortTransaction(); + } + + @Override + public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { + kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record) { + return kafkaProducer.send(record); + } + + @Override + public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { + return kafkaProducer.send(record, callback); + } + + @Override + public List<PartitionInfo> partitionsFor(String topic) { + return kafkaProducer.partitionsFor(topic); + } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return kafkaProducer.metrics(); + } + + @Override + public void close() { + kafkaProducer.close(); + } + + @Override + public void close(long timeout, TimeUnit unit) { + kafkaProducer.close(timeout, unit); + } + + // -------------------------------- New methods or methods with changed behaviour -------------------------------- + + @Override + public void flush() { + kafkaProducer.flush(); + if (transactionalId != null) { + flushNewPartitions(); + } + } + + public void resumeTransaction(long producerId, short epoch) { + if (!(producerId >= 0 && epoch >= 0)) { + throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch)); --- End diff -- Can use `Preconditions.checkState(...)` here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---