fapaul commented on code in PR #152: URL: https://github.com/apache/flink-connector-kafka/pull/152#discussion_r1952842085
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java: ########## @@ -110,28 +110,35 @@ public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() { @Internal @Override public KafkaWriter<IN> createWriter(InitContext context) throws IOException { - return new KafkaWriter<IN>( - deliveryGuarantee, - kafkaProducerConfig, - transactionalIdPrefix, - context, - recordSerializer, - context.asSerializationSchemaInitializationContext(), - Collections.emptyList()); + return restoreWriter(context, Collections.emptyList()); } @Internal @Override public KafkaWriter<IN> restoreWriter( - InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException { - return new KafkaWriter<>( - deliveryGuarantee, - kafkaProducerConfig, - transactionalIdPrefix, - context, - recordSerializer, - context.asSerializationSchemaInitializationContext(), - recoveredState); + InitContext context, Collection<KafkaWriterState> recoveredState) { + KafkaWriter<IN> writer; + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + writer = + new ExactlyOnceKafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + transactionalIdPrefix, + context, + recordSerializer, + context.asSerializationSchemaInitializationContext(), + Collections.emptyList()); + } else { + writer = + new KafkaWriter<>( + deliveryGuarantee, + kafkaProducerConfig, + context, + recordSerializer, + context.asSerializationSchemaInitializationContext()); + } + writer.initialize(); Review Comment: Why did you introduce a dedicated initialize method? I'd like to keep all initialization as part of the ctor if possible. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Exactly-once Kafka writer that writes records to Kafka in transactions. + * + * @param <IN> The type of the input elements. + */ +class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { + private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class); + private final String transactionalIdPrefix; + + private final KafkaWriterState kafkaWriterState; + // producer pool only used for exactly once + private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = + new ArrayDeque<>(); + private final Collection<KafkaWriterState> recoveredStates; + private long lastCheckpointId; + + private final Deque<Closeable> producerCloseables = new ArrayDeque<>(); + + /** + * Constructor creating a kafka writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext, + * KafkaRecordSerializationSchema.KafkaSinkContext)} fails. + * + * @param deliveryGuarantee the Sink's delivery guarantee + * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} + * @param transactionalIdPrefix used to create the transactionalIds + * @param sinkInitContext context to provide information about the runtime environment + * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} + * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} + * @param recoveredStates state from an previous execution which was covered + */ + ExactlyOnceKafkaWriter( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + String transactionalIdPrefix, + Sink.InitContext sinkInitContext, + KafkaRecordSerializationSchema<IN> recordSerializer, + SerializationSchema.InitializationContext schemaContext, + Collection<KafkaWriterState> recoveredStates) { + super( + deliveryGuarantee, + kafkaProducerConfig, + sinkInitContext, + recordSerializer, + schemaContext); + this.transactionalIdPrefix = + checkNotNull(transactionalIdPrefix, "transactionalIdPrefix must not be null"); + + try { + recordSerializer.open(schemaContext, kafkaSinkContext); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix); + this.lastCheckpointId = + sinkInitContext + .getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) { Review Comment: Why do we need to pass the delivery guarantee explicitly? I'd rather delegate the responsibility to the creator side. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.Closeable; +import java.util.function.Consumer; + +/** + * A backchannel for communication between the commiter -> writer. It's used to signal that certain + * transactions have been committed and respective producers are good to be reused. + * + * <p>The model closely follows the idea of statefun except that there is no need to checkpoint the + * state since the backchannel will fully recover on restart from the committer state. + * + * <p>Establishing a backchannel for Kafka sink works because there is only writer and committer and + * nothing in between these two operators. In most cases, these two are chained in live inside the + * same task thread. In rare cases, committer and writer are not chained, so writer and committer + * are in different tasks and threads. However, because of colocations of tasks, we still know that + * both instances will run inside the same JVM and we can establish a backchannel between them. The + * latter case requires some synchronization in the buffer. + * + * <p>Messages can be sent before the backchannel is established. They will be consumed once the + * backchannel is established. + */ +public interface Backchannel<T> extends Closeable { + /** + * Send a message to the other side of the backchannel. Currently, this is the transaction id of + * a committed transaction. + */ + void send(T message); + + /** + * Consume messages from the other side of the backchannel. This is used to signal that certain + * transactions have been committed, so that the writer can recycle the producer. + */ + void consume(Consumer<T> consumer); Review Comment: I find this method a bit confusing - The doc string states `Consume messages from the other side` but the method doesn't return anything only takes an argument. - Isn't this method also "sending" something in this case the `Consumer`? Why do we need send and consume? - Please also clarify that the backchannel does not store the Consumers e.g. if there are no messages you need to call consume again that the `Consumer` is triggered ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; +import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Exactly-once Kafka writer that writes records to Kafka in transactions. + * + * @param <IN> The type of the input elements. + */ +class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> { + private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class); + private final String transactionalIdPrefix; + + private final KafkaWriterState kafkaWriterState; + // producer pool only used for exactly once + private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = + new ArrayDeque<>(); + private final Collection<KafkaWriterState> recoveredStates; + private long lastCheckpointId; + + private final Deque<Closeable> producerCloseables = new ArrayDeque<>(); + + /** + * Constructor creating a kafka writer. + * + * <p>It will throw a {@link RuntimeException} if {@link + * KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext, + * KafkaRecordSerializationSchema.KafkaSinkContext)} fails. + * + * @param deliveryGuarantee the Sink's delivery guarantee + * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} + * @param transactionalIdPrefix used to create the transactionalIds + * @param sinkInitContext context to provide information about the runtime environment + * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} + * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} + * @param recoveredStates state from an previous execution which was covered + */ + ExactlyOnceKafkaWriter( + DeliveryGuarantee deliveryGuarantee, + Properties kafkaProducerConfig, + String transactionalIdPrefix, + Sink.InitContext sinkInitContext, + KafkaRecordSerializationSchema<IN> recordSerializer, + SerializationSchema.InitializationContext schemaContext, + Collection<KafkaWriterState> recoveredStates) { + super( + deliveryGuarantee, + kafkaProducerConfig, + sinkInitContext, + recordSerializer, + schemaContext); + this.transactionalIdPrefix = + checkNotNull(transactionalIdPrefix, "transactionalIdPrefix must not be null"); + + try { + recordSerializer.open(schemaContext, kafkaSinkContext); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot initialize schema.", e); + } + + this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix); + this.lastCheckpointId = + sinkInitContext + .getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) { + throw new UnsupportedOperationException( + "Unsupported Kafka writer semantic " + deliveryGuarantee); + } + + this.recoveredStates = checkNotNull(recoveredStates, "recoveredStates"); + initFlinkMetrics(); + } + + @Override + public void initialize() { + abortLingeringTransactions(recoveredStates, lastCheckpointId + 1); + this.currentProducer = getTransactionalProducer(lastCheckpointId + 1); + this.currentProducer.beginTransaction(); + } + + @Override + public Collection<KafkaCommittable> prepareCommit() { + // only return a KafkaCommittable if the current transaction has been written some data + if (currentProducer.hasRecordsInTransaction()) { + final List<KafkaCommittable> committables = + Collections.singletonList( + KafkaCommittable.of(currentProducer, producerPool::add)); + LOG.debug("Committing {} committables.", committables); + return committables; + } + + // otherwise, we commit the empty transaction as is (no-op) and just recycle the producer + currentProducer.commitTransaction(); + producerPool.add(currentProducer); + return Collections.emptyList(); + } + + @Override + public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException { + currentProducer = getTransactionalProducer(checkpointId + 1); + currentProducer.beginTransaction(); + return Collections.singletonList(kafkaWriterState); + } + + @Override + public void close() throws Exception { + closeAll( + this::abortCurrentProducer, + () -> closeAll(producerPool), + () -> closeAll(producerCloseables.stream()), + super::close); + } + + private void abortCurrentProducer() { + // only abort if the transaction is known to the broker (needs to have at least one record + // sent) + if (currentProducer.isInTransaction() && currentProducer.hasRecordsInTransaction()) { + try { + currentProducer.abortTransaction(); + } catch (ProducerFencedException e) { + LOG.debug( + "Producer {} fenced while aborting", currentProducer.getTransactionalId()); + } + } + } + + Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducerPool() { + return producerPool; + } + + void abortLingeringTransactions( Review Comment: Nit: this can be private ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A backchannel for communication between the Kafka committer -> writer. It's used to recycle + * producer and signal that certain transactions have been committed on recovery. + */ +@Internal +@ThreadSafe +public class BackchannelImpl<T> implements Backchannel<T> { + private static final Logger LOG = LoggerFactory.getLogger(BackchannelImpl.class); + + /** + * The map of backchannels, keyed by the subtask id and transactional id prefix to uniquely + * identify the backchannel while establishing the connection. + */ + @GuardedBy("this") Review Comment: I usually use `GuardedBy` to signal I am using the parameter as object lock. In this case I do not see `this` as a monitor object. Also, using `this` (an object reference?) as lock for a stick member is strange. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.Closeable; +import java.util.function.Consumer; + +/** + * A backchannel for communication between the commiter -> writer. It's used to signal that certain + * transactions have been committed and respective producers are good to be reused. + * + * <p>The model closely follows the idea of statefun except that there is no need to checkpoint the + * state since the backchannel will fully recover on restart from the committer state. + * + * <p>Establishing a backchannel for Kafka sink works because there is only writer and committer and + * nothing in between these two operators. In most cases, these two are chained in live inside the + * same task thread. In rare cases, committer and writer are not chained, so writer and committer + * are in different tasks and threads. However, because of colocations of tasks, we still know that + * both instances will run inside the same JVM and we can establish a backchannel between them. The + * latter case requires some synchronization in the buffer. + * + * <p>Messages can be sent before the backchannel is established. They will be consumed once the + * backchannel is established. + */ +public interface Backchannel<T> extends Closeable { + /** + * Send a message to the other side of the backchannel. Currently, this is the transaction id of + * a committed transaction. + */ + void send(T message); + + /** + * Consume messages from the other side of the backchannel. This is used to signal that certain + * transactions have been committed, so that the writer can recycle the producer. + */ + void consume(Consumer<T> consumer); + + /** Forcefully establishes the backchannel. This is used for tests. */ + @VisibleForTesting + BackchannelImpl<T> establish(); Review Comment: Why can't we instantiate the `BackchannelImpl` as part of the tests instead of having it in the interface? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.io.Closeable; +import java.util.function.Consumer; + +/** + * A backchannel for communication between the commiter -> writer. It's used to signal that certain + * transactions have been committed and respective producers are good to be reused. + * + * <p>The model closely follows the idea of statefun except that there is no need to checkpoint the + * state since the backchannel will fully recover on restart from the committer state. + * + * <p>Establishing a backchannel for Kafka sink works because there is only writer and committer and + * nothing in between these two operators. In most cases, these two are chained in live inside the + * same task thread. In rare cases, committer and writer are not chained, so writer and committer + * are in different tasks and threads. However, because of colocations of tasks, we still know that Review Comment: What exactly is meant by task colocation? Is it always ensured that operators connected by a forward pass are part of the same task/taskmanager? Ideally, we link a code piece to public documentation. Otherwise, I am afraid it may break unnoticed. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A backchannel for communication between the Kafka committer -> writer. It's used to recycle + * producer and signal that certain transactions have been committed on recovery. + */ +@Internal +@ThreadSafe +public class BackchannelImpl<T> implements Backchannel<T> { + private static final Logger LOG = LoggerFactory.getLogger(BackchannelImpl.class); + + /** + * The map of backchannels, keyed by the subtask id and transactional id prefix to uniquely + * identify the backchannel while establishing the connection. + */ + @GuardedBy("this") + private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> BACKCHANNELS = + new HashMap<>(); + /** The key to identify the backchannel (subtask id, transactional id prefix). */ + private final Tuple2<Integer, String> key; + /** + * The writer thread that sends messages to the committer. Used to ensure that we switch to a + * concurrent queue when the writer and committer are not chained. + */ + private Thread writerThread; + /** + * The committer thread that consumes messages from the writer. Used to ensure that we switch to + * a concurrent queue when the writer and committer are not chained. + */ + private Thread committerThread; + /** + * The messages to be sent from the writer to the committer. By default, it's a thread-safe + * implementation but it can be replaced with an unsynchronized queue when the writer and + * committer are chained (common case). + */ + private Deque<T> messages = new ConcurrentLinkedDeque<>(); + /** Flag to indicate that the backchannel has been established. */ + private boolean established; + + BackchannelImpl(Tuple2<Integer, String> key) { + this.key = key; + } + + @Override + public void send(T message) { + messages.add(message); + } + + @Override + public void consume(Consumer<T> consumer) { + ensureEstablished(); + T message; + while ((message = pollUnsafe()) != null) { + consumer.accept(message); + } + } + + @Nullable + private T pollUnsafe() { + return messages.poll(); + } + + @VisibleForTesting + @Nullable + public T poll() { + ensureEstablished(); + return pollUnsafe(); + } + + @VisibleForTesting + @Override + public BackchannelImpl<T> establish() { + return create(key.f0, key.f1, writerThread == null); + } + + /** + * Create a backchannel for the given subtask id and transactional id prefix. The backchannel is + * created if it does not exist, otherwise, the existing backchannel is returned. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix but + * different forWriter values, the backchannel will be established and can be used for + * communication. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix and + * the same forWriter value, an error will be logged instead. There is a high chance that the + * backchannel is misused by using non-uniques transactionalIds. + */ + @SuppressWarnings("unchecked") + public static <T> BackchannelImpl<T> create( + int subtaskId, String transactionalIdPrefix, boolean forWriter) { + Tuple2<Integer, String> key = new Tuple2<>(subtaskId, transactionalIdPrefix); + BackchannelImpl<T> backchannel; + boolean duplicate; + synchronized (BACKCHANNELS) { + backchannel = + (BackchannelImpl<T>) + BACKCHANNELS.computeIfAbsent(key, k -> new BackchannelImpl<>(key)); + duplicate = backchannel.isDuplicate(forWriter); + backchannel.register(forWriter); + } + if (duplicate) { + LOG.error( Review Comment: Is it intentionally that the logging happens outside the critical section? It makes reading the code a lot hard. Can we do ```java if (backchannel.isDuplicate(forWriter)) { LOG.error(...) } ``` ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A backchannel for communication between the Kafka committer -> writer. It's used to recycle + * producer and signal that certain transactions have been committed on recovery. + */ +@Internal +@ThreadSafe +public class BackchannelImpl<T> implements Backchannel<T> { + private static final Logger LOG = LoggerFactory.getLogger(BackchannelImpl.class); + + /** + * The map of backchannels, keyed by the subtask id and transactional id prefix to uniquely + * identify the backchannel while establishing the connection. + */ + @GuardedBy("this") + private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> BACKCHANNELS = + new HashMap<>(); + /** The key to identify the backchannel (subtask id, transactional id prefix). */ + private final Tuple2<Integer, String> key; + /** + * The writer thread that sends messages to the committer. Used to ensure that we switch to a + * concurrent queue when the writer and committer are not chained. + */ + private Thread writerThread; + /** + * The committer thread that consumes messages from the writer. Used to ensure that we switch to + * a concurrent queue when the writer and committer are not chained. + */ + private Thread committerThread; + /** + * The messages to be sent from the writer to the committer. By default, it's a thread-safe + * implementation but it can be replaced with an unsynchronized queue when the writer and + * committer are chained (common case). + */ + private Deque<T> messages = new ConcurrentLinkedDeque<>(); + /** Flag to indicate that the backchannel has been established. */ + private boolean established; + + BackchannelImpl(Tuple2<Integer, String> key) { + this.key = key; + } + + @Override + public void send(T message) { + messages.add(message); + } + + @Override + public void consume(Consumer<T> consumer) { + ensureEstablished(); + T message; + while ((message = pollUnsafe()) != null) { + consumer.accept(message); + } + } + + @Nullable + private T pollUnsafe() { + return messages.poll(); + } + + @VisibleForTesting + @Nullable + public T poll() { + ensureEstablished(); + return pollUnsafe(); + } + + @VisibleForTesting + @Override + public BackchannelImpl<T> establish() { + return create(key.f0, key.f1, writerThread == null); + } + + /** + * Create a backchannel for the given subtask id and transactional id prefix. The backchannel is + * created if it does not exist, otherwise, the existing backchannel is returned. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix but + * different forWriter values, the backchannel will be established and can be used for + * communication. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix and + * the same forWriter value, an error will be logged instead. There is a high chance that the + * backchannel is misused by using non-uniques transactionalIds. + */ + @SuppressWarnings("unchecked") + public static <T> BackchannelImpl<T> create( + int subtaskId, String transactionalIdPrefix, boolean forWriter) { + Tuple2<Integer, String> key = new Tuple2<>(subtaskId, transactionalIdPrefix); + BackchannelImpl<T> backchannel; + boolean duplicate; + synchronized (BACKCHANNELS) { + backchannel = + (BackchannelImpl<T>) + BACKCHANNELS.computeIfAbsent(key, k -> new BackchannelImpl<>(key)); + duplicate = backchannel.isDuplicate(forWriter); + backchannel.register(forWriter); + } + if (duplicate) { + LOG.error( + "Found duplicate transactionalIdPrefix for multiple Kafka sinks: {}. Transactional id prefixes need to be unique. You may experience memory leaks without fixing this.", + transactionalIdPrefix); + } + return backchannel; + } + + private boolean isDuplicate(boolean forWriter) { + return forWriter ? writerThread != null : committerThread != null; + } + + private void ensureEstablished() { + if (established) { + return; + } + + // Most of the fields are only properly initialized by the second side of register (writer + // or committer). + // This method briefly enters the critical section to ensure that all fields are visible to + // the first side. + boolean established; + synchronized (BACKCHANNELS) { + established = this.established; + } + + checkState(established, "Backchannel not established for %s", key); + } + + /** + * Register the current thread as either writer or committer. If both threads are registered, + * the backchannel is established. + * + * <p>This method must be called in a critical section. + */ + private void register(boolean forWriter) { + boolean complete; + if (forWriter) { + complete = committerThread != null; + writerThread = Thread.currentThread(); + } else { + complete = writerThread != null; + committerThread = Thread.currentThread(); + } + if (complete) { + // in the case where writer and committer are chained (common case), we can switch to a + // non-thread-safe queue. + if (writerThread == committerThread) { + messages = new ArrayDeque<>(messages); + } + established = true; + LOG.debug("Backchannel established for {}", key); + } + } + + @Override + public void close() { + BACKCHANNELS.remove(key); Review Comment: Missing the synchronized block? ########## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java: ########## @@ -276,8 +277,9 @@ private void triggerProducerException(KafkaWriter<Integer> writer, Properties pr new FlinkKafkaInternalProducer<>(properties, transactionalId)) { producer.initTransactions(); producer.beginTransaction(); - producer.send(new ProducerRecord<byte[], byte[]>(topic, "1".getBytes())); + producer.send(new ProducerRecord<>(topic, "1".getBytes())); producer.commitTransaction(); + producer.flush(); Review Comment: This change looks unrelated to the commit and the refactoring of splitting the writer. ########## flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImplTest.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +class BackchannelImplTest { Review Comment: Let's also add a test with multiple consumes where the first `Consumer` is "discarded". ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A backchannel for communication between the Kafka committer -> writer. It's used to recycle + * producer and signal that certain transactions have been committed on recovery. + */ +@Internal +@ThreadSafe +public class BackchannelImpl<T> implements Backchannel<T> { + private static final Logger LOG = LoggerFactory.getLogger(BackchannelImpl.class); + + /** + * The map of backchannels, keyed by the subtask id and transactional id prefix to uniquely + * identify the backchannel while establishing the connection. + */ + @GuardedBy("this") + private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> BACKCHANNELS = + new HashMap<>(); + /** The key to identify the backchannel (subtask id, transactional id prefix). */ + private final Tuple2<Integer, String> key; + /** + * The writer thread that sends messages to the committer. Used to ensure that we switch to a + * concurrent queue when the writer and committer are not chained. + */ + private Thread writerThread; + /** + * The committer thread that consumes messages from the writer. Used to ensure that we switch to + * a concurrent queue when the writer and committer are not chained. + */ + private Thread committerThread; + /** + * The messages to be sent from the writer to the committer. By default, it's a thread-safe + * implementation but it can be replaced with an unsynchronized queue when the writer and + * committer are chained (common case). + */ + private Deque<T> messages = new ConcurrentLinkedDeque<>(); + /** Flag to indicate that the backchannel has been established. */ + private boolean established; + + BackchannelImpl(Tuple2<Integer, String> key) { + this.key = key; + } + + @Override + public void send(T message) { + messages.add(message); + } + + @Override + public void consume(Consumer<T> consumer) { + ensureEstablished(); + T message; + while ((message = pollUnsafe()) != null) { + consumer.accept(message); + } + } + + @Nullable + private T pollUnsafe() { + return messages.poll(); + } + + @VisibleForTesting + @Nullable + public T poll() { + ensureEstablished(); + return pollUnsafe(); + } + + @VisibleForTesting + @Override + public BackchannelImpl<T> establish() { + return create(key.f0, key.f1, writerThread == null); + } + + /** + * Create a backchannel for the given subtask id and transactional id prefix. The backchannel is + * created if it does not exist, otherwise, the existing backchannel is returned. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix but + * different forWriter values, the backchannel will be established and can be used for + * communication. + * + * <p>If this method is called twice with the same subtask id and transactional id prefix and + * the same forWriter value, an error will be logged instead. There is a high chance that the + * backchannel is misused by using non-uniques transactionalIds. + */ + @SuppressWarnings("unchecked") + public static <T> BackchannelImpl<T> create( + int subtaskId, String transactionalIdPrefix, boolean forWriter) { + Tuple2<Integer, String> key = new Tuple2<>(subtaskId, transactionalIdPrefix); + BackchannelImpl<T> backchannel; + boolean duplicate; + synchronized (BACKCHANNELS) { + backchannel = + (BackchannelImpl<T>) + BACKCHANNELS.computeIfAbsent(key, k -> new BackchannelImpl<>(key)); + duplicate = backchannel.isDuplicate(forWriter); + backchannel.register(forWriter); + } + if (duplicate) { + LOG.error( + "Found duplicate transactionalIdPrefix for multiple Kafka sinks: {}. Transactional id prefixes need to be unique. You may experience memory leaks without fixing this.", + transactionalIdPrefix); + } + return backchannel; + } + + private boolean isDuplicate(boolean forWriter) { + return forWriter ? writerThread != null : committerThread != null; + } + + private void ensureEstablished() { + if (established) { + return; + } + + // Most of the fields are only properly initialized by the second side of register (writer + // or committer). + // This method briefly enters the critical section to ensure that all fields are visible to + // the first side. + boolean established; + synchronized (BACKCHANNELS) { + established = this.established; + } Review Comment: Using an AtomicBoolean seems cleaner here since it should be a one-time switch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org