[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138243#comment-16138243 ]
ASF GitHub Bot commented on FLINK-6988: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134725699 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer + * will use {@link Semantic#EXACTLY_ONCE} semantic. + * + * <p>Implementation note: This producer is a hybrid between a regular regular + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) and a custom operator (b). + * + * <p>Details about approach (a): + * Because of regular {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, this + * variant do not allow accessing the timestamp attached to the record. + * + * <p>Details about approach (b): + * Kafka 0.11 supports writing the timestamp attached to a record to Kafka. When using the + * {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the Kafka producer can access the internal + * record timestamp of the record and write it to Kafka. + * + * <p>All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer011<IN> + extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState> { + + /** + * Semantics that can be chosen. + * <li>{@link #EXACTLY_ONCE}</li> + * <li>{@link #AT_LEAST_ONCE}</li> + * <li>{@link #NONE}</li> + */ + public enum Semantic { + /** + * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be + * committed to the Kafka on a checkpoint. + * + * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each + * checkpoint there is created new Kafka transaction, which is being committed on + * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are + * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that + * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail + * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint. + * To decrease chances of failing checkpoints there are three options: + * <li>decrease number of max concurrent checkpoints</li> + * <li>make checkpoints more reliable (so that they complete faster)</li> + * <li>increase delay between checkpoints</li> + * <li>increase size of {@link FlinkKafkaProducer}s pool</li> + */ + EXACTLY_ONCE, + /** + * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + */ + AT_LEAST_ONCE, + /** + * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case + * of failure. + */ + NONE + } + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}. + */ + public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; + + /** + * Configuration key for disabling the metrics reporting. + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Descriptor of the transacionalIds list. + */ + private static final ListStateDescriptor<String> TRANSACTIONAL_IDS_DESCRIPTOR = + new ListStateDescriptor<>("transactional-ids", TypeInformation.of(String.class)); + + /** + * Pool of transacional ids backed up in state. + */ + private ListState<String> transactionalIdsState; + + /** + * Already used transactional ids. + */ + private final Set<String> usedTransactionalIds = new HashSet<>(); + + /** + * Available to use transactional ids. + */ + private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>(); + + /** + * User defined properties for the Producer. + */ + private final Properties producerConfig; + + /** + * The name of the default topic this producer is writing data to. + */ + private final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into. + * byte[] for Kafka. + */ + private final KeyedSerializationSchema<IN> schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition for each topic. + */ + private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; + + /** + * Partitions of each topic. + */ + private final Map<String, int[]> topicPartitionsMap; + + /** + * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception. + */ + private final int kafkaProducersPoolSize; + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures. + */ + private boolean logFailuresOnly; + + /** + * Semantic chosen for this instance. + */ + private Semantic semantic; + + /** + * Pool of KafkaProducers objects. + */ + private transient ProducersPool producersPool = new ProducersPool(); + + // -------------------------------- Runtime fields ------------------------------------------ + + /** The callback than handles error propagation or logging callbacks. */ + @Nullable + private transient Callback callback; + + /** Errors encountered in the async producer are stored here. */ + @Nullable + private transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + private final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + private final AtomicLong pendingRecords = new AtomicLong(); + + /** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */ + private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>(); + + // ---------------------- "Constructors" for timestamp writing ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner) { + return writeToKafkaWithTimestamps( + inStream, + topicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a + * kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only + * required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream, + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer011<IN> kafkaProducer = + new FlinkKafkaProducer011<>( + topicId, + serializationSchema, + producerConfig, + customPartitioner, + semantic, + kafkaProducersPoolSize); + KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer); + SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink); + return new FlinkKafkaProducer011Configuration<>(transformation, streamSink); + } + + // ---------------------- Regular constructors w/o timestamp support ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + */ + public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { + this( + defaultTopicId, + serializationSchema, + producerConfig, + customPartitioner, + Semantic.EXACTLY_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer011( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<IN> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + super( + TypeInformation.of(KafkaTransactionState.class), + TypeInformation.of(new TypeHint<List<KafkaTransactionState>>() {})); + + requireNonNull(defaultTopicId, "TopicID not set"); + requireNonNull(serializationSchema, "serializationSchema not set"); + requireNonNull(producerConfig, "producerConfig not set"); + ClosureCleaner.clean(customPartitioner, true); + ClosureCleaner.ensureSerializable(serializationSchema); + + this.defaultTopicId = defaultTopicId; + this.schema = serializationSchema; + this.producerConfig = producerConfig; + this.flinkKafkaPartitioner = customPartitioner; + this.semantic = semantic; + this.kafkaProducersPoolSize = kafkaProducersPoolSize; + + // set the producer configuration properties for kafka record key value serializers. + if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); + } + + this.topicPartitionsMap = new HashMap<>(); + } + + // ---------------------------------- Properties -------------------------- + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * <p>Method is only accessible for approach (a) (see above) + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + // ----------------------------------- Utilities -------------------------- + + /** + * Initializes the connection to Kafka. + * + * <p>This method is used for approach (a) (see above). + */ + @Override + public void open(Configuration configuration) throws Exception { + if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn(String.format("Using [%s] semantic, but checkpointing is not enabled. Switching to [%s] semantic.", semantic, Semantic.NONE)); + semantic = Semantic.NONE; + } + + if (logFailuresOnly) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } + acknowledgeMessage(); + } + }; + } + else { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null && asyncException == null) { + asyncException = exception; + } + acknowledgeMessage(); + } + }; + } + + super.open(configuration); + } + + @Override + public void invoke(KafkaTransactionState transaction, IN next) throws Exception { + invokeInternal(transaction, next, Long.MAX_VALUE); + } + + private void invokeInternal(KafkaTransactionState transaction, IN next, long elementTimestamp) throws Exception { + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(next); + byte[] serializedValue = schema.serializeValue(next); + String targetTopic = schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + + Long timestamp = null; + if (this.writeTimestampToKafka) { + timestamp = elementTimestamp; + } + + ProducerRecord<byte[], byte[]> record; + int[] partitions = topicPartitionsMap.get(targetTopic); + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + if (flinkKafkaPartitioner == null) { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); + } + pendingRecords.incrementAndGet(); + transaction.producer.send(record, callback); + } + + @Override + public void close() throws Exception { + if (currentTransaction != null) { + // to avoid exceptions on aborting transactions with some pending records + flush(currentTransaction); + } + try { + super.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + try { + producersPool.close(); + } + catch (Exception e) { + asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } + // make sure we propagate pending errors + checkErroneous(); + } + + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + @Override + protected KafkaTransactionState beginTransaction() throws Exception { + switch (semantic) { + case EXACTLY_ONCE: + FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll(); + if (producer == null) { + String transactionalId = availableTransactionalIds.poll(); + if (transactionalId == null) { + throw new Exception( + "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins."); + } + usedTransactionalIds.add(transactionalId); + producer = initTransactionalProducer(transactionalId, true); + producer.initTransactions(); + } + producer.beginTransaction(); + return new KafkaTransactionState(producer.getTransactionalId(), producer); + case AT_LEAST_ONCE: + case NONE: --- End diff -- Because `NONE` and `AT_LEAST_ONCE` do not need to start new transaction, the `beginTransaction()` implementation for both them is blank. Those two differ only in `preCommit()` where the later one flushes the data. > Add Apache Kafka 0.11 connector > ------------------------------- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)