Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4239#discussion_r134399156
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka.internal;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.Node;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.kafka.common.errors.ProducerFencedException;
    +import org.apache.kafka.common.requests.FindCoordinatorRequest;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.lang.reflect.Field;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Wrapper around KafkaProducer that allows to resume transactions in case 
of node failure, which allows to implement
    + * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
    + *
    + * <p>For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
    + * expected to call:
    + *
    + * <ul>
    + *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
    + *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
    + *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
    + *     <li>{@link FlinkKafkaProducer#flush()}</li>
    + *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
    + * </ul>
    + *
    + * <p>To actually implement two phase commit, it must be possible to 
always commit a transaction after pre-committing
    + * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In 
case of some failure between
    + * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
    + * interrupted transaction and commit if after a restart:
    + *
    + * <ul>
    + *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
    + *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
    + *     <li>{@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
    + *     <li>{@link FlinkKafkaProducer#flush()}</li>
    + *     <li>{@link FlinkKafkaProducer#getProducerId()}</li>
    + *     <li>{@link FlinkKafkaProducer#getEpoch()}</li>
    + *     <li>node failure... restore producerId and epoch from state</li>
    + *     <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
    + *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
    + * </ul>
    + *
    + * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
    + * as a way to obtain the producerId and epoch counters. It has to be 
done, because otherwise
    + * {@link FlinkKafkaProducer#initTransactions()} would automatically abort 
all on going transactions.
    + *
    + * <p>Second way this implementation differs from the reference {@link 
org.apache.kafka.clients.producer.KafkaProducer}
    + * is that this one actually flushes new partitions on {@link 
FlinkKafkaProducer#flush()} instead of on
    + * {@link FlinkKafkaProducer#commitTransaction()}.
    + *
    + * <p>The last one minor difference is that it allows to obtain the 
producerId and epoch counters via
    + * {@link FlinkKafkaProducer#getProducerId()} and {@link 
FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
    + * private fields).
    + *
    + * <p>Those changes are compatible with Kafka's 0.11.0 REST API although 
it clearly was not the intention of the Kafka's
    + * API authors to make them possible.
    + *
    + * <p>Internally this implementation uses {@link 
org.apache.kafka.clients.producer.KafkaProducer} and implements
    + * required changes via Java Reflection API. It might not be the prettiest 
solution. An alternative would be to
    + * re-implement whole Kafka's 0.11 REST API client on our own.
    + */
    +public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducer.class);
    +
    +   private final KafkaProducer<K, V> kafkaProducer;
    +   @Nullable
    +   private final String transactionalId;
    +
    +   public FlinkKafkaProducer(Properties properties) {
    +           transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
    +           kafkaProducer = new KafkaProducer<>(properties);
    +   }
    +
    +   // -------------------------------- Simple proxy method calls 
--------------------------------
    +
    +   @Override
    +   public void initTransactions() {
    +           kafkaProducer.initTransactions();
    +   }
    +
    +   @Override
    +   public void beginTransaction() throws ProducerFencedException {
    +           kafkaProducer.beginTransaction();
    +   }
    +
    +   @Override
    +   public void commitTransaction() throws ProducerFencedException {
    +           kafkaProducer.commitTransaction();
    +   }
    +
    +   @Override
    +   public void abortTransaction() throws ProducerFencedException {
    +           kafkaProducer.abortTransaction();
    +   }
    +
    +   @Override
    +   public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets, String consumerGroupId) throws 
ProducerFencedException {
    +           kafkaProducer.sendOffsetsToTransaction(offsets, 
consumerGroupId);
    +   }
    +
    +   @Override
    +   public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    +           return kafkaProducer.send(record);
    +   }
    +
    +   @Override
    +   public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
    +           return kafkaProducer.send(record, callback);
    +   }
    +
    +   @Override
    +   public List<PartitionInfo> partitionsFor(String topic) {
    +           return kafkaProducer.partitionsFor(topic);
    +   }
    +
    +   @Override
    +   public Map<MetricName, ? extends Metric> metrics() {
    +           return kafkaProducer.metrics();
    +   }
    +
    +   @Override
    +   public void close() {
    +           kafkaProducer.close();
    +   }
    +
    +   @Override
    +   public void close(long timeout, TimeUnit unit) {
    +           kafkaProducer.close(timeout, unit);
    +   }
    +
    +   // -------------------------------- New methods or methods with changed 
behaviour --------------------------------
    +
    +   @Override
    +   public void flush() {
    +           kafkaProducer.flush();
    +           if (transactionalId != null) {
    +                   flushNewPartitions();
    +           }
    +   }
    +
    +   public void resumeTransaction(long producerId, short epoch) {
    +           if (!(producerId >= 0 && epoch >= 0)) {
    +                   throw new 
IllegalStateException(String.format("Incorrect values for producerId [%s] and 
epoch [%s]", producerId, epoch));
    --- End diff --
    
    Can use `Preconditions.checkState(...)` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to