fapaul commented on code in PR #152:
URL: 
https://github.com/apache/flink-connector-kafka/pull/152#discussion_r1952842085


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java:
##########
@@ -110,28 +110,35 @@ public SimpleVersionedSerializer<KafkaCommittable> 
getCommittableSerializer() {
     @Internal
     @Override
     public KafkaWriter<IN> createWriter(InitContext context) throws 
IOException {
-        return new KafkaWriter<IN>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context,
-                recordSerializer,
-                context.asSerializationSchemaInitializationContext(),
-                Collections.emptyList());
+        return restoreWriter(context, Collections.emptyList());
     }
 
     @Internal
     @Override
     public KafkaWriter<IN> restoreWriter(
-            InitContext context, Collection<KafkaWriterState> recoveredState) 
throws IOException {
-        return new KafkaWriter<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context,
-                recordSerializer,
-                context.asSerializationSchemaInitializationContext(),
-                recoveredState);
+            InitContext context, Collection<KafkaWriterState> recoveredState) {
+        KafkaWriter<IN> writer;
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            writer =
+                    new ExactlyOnceKafkaWriter<>(
+                            deliveryGuarantee,
+                            kafkaProducerConfig,
+                            transactionalIdPrefix,
+                            context,
+                            recordSerializer,
+                            
context.asSerializationSchemaInitializationContext(),
+                            Collections.emptyList());
+        } else {
+            writer =
+                    new KafkaWriter<>(
+                            deliveryGuarantee,
+                            kafkaProducerConfig,
+                            context,
+                            recordSerializer,
+                            
context.asSerializationSchemaInitializationContext());
+        }
+        writer.initialize();

Review Comment:
   Why did you introduce a dedicated initialize method? I'd like to keep all 
initialization as part of the ctor if possible.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Exactly-once Kafka writer that writes records to Kafka in transactions.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
+    private final String transactionalIdPrefix;
+
+    private final KafkaWriterState kafkaWriterState;
+    // producer pool only used for exactly once
+    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> 
producerPool =
+            new ArrayDeque<>();
+    private final Collection<KafkaWriterState> recoveredStates;
+    private long lastCheckpointId;
+
+    private final Deque<Closeable> producerCloseables = new ArrayDeque<>();
+
+    /**
+     * Constructor creating a kafka writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * 
KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext,
+     * KafkaRecordSerializationSchema.KafkaSinkContext)} fails.
+     *
+     * @param deliveryGuarantee the Sink's delivery guarantee
+     * @param kafkaProducerConfig the properties to configure the {@link 
FlinkKafkaInternalProducer}
+     * @param transactionalIdPrefix used to create the transactionalIds
+     * @param sinkInitContext context to provide information about the runtime 
environment
+     * @param recordSerializer serialize to transform the incoming records to 
{@link ProducerRecord}
+     * @param schemaContext context used to initialize the {@link 
KafkaRecordSerializationSchema}
+     * @param recoveredStates state from an previous execution which was 
covered
+     */
+    ExactlyOnceKafkaWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            Properties kafkaProducerConfig,
+            String transactionalIdPrefix,
+            Sink.InitContext sinkInitContext,
+            KafkaRecordSerializationSchema<IN> recordSerializer,
+            SerializationSchema.InitializationContext schemaContext,
+            Collection<KafkaWriterState> recoveredStates) {
+        super(
+                deliveryGuarantee,
+                kafkaProducerConfig,
+                sinkInitContext,
+                recordSerializer,
+                schemaContext);
+        this.transactionalIdPrefix =
+                checkNotNull(transactionalIdPrefix, "transactionalIdPrefix 
must not be null");
+
+        try {
+            recordSerializer.open(schemaContext, kafkaSinkContext);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
+        this.lastCheckpointId =
+                sinkInitContext
+                        .getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {

Review Comment:
   Why do we need to pass the delivery guarantee explicitly? I'd rather 
delegate the responsibility to the creator side.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.Closeable;
+import java.util.function.Consumer;
+
+/**
+ * A backchannel for communication between the commiter -> writer. It's used 
to signal that certain
+ * transactions have been committed and respective producers are good to be 
reused.
+ *
+ * <p>The model closely follows the idea of statefun except that there is no 
need to checkpoint the
+ * state since the backchannel will fully recover on restart from the 
committer state.
+ *
+ * <p>Establishing a backchannel for Kafka sink works because there is only 
writer and committer and
+ * nothing in between these two operators. In most cases, these two are 
chained in live inside the
+ * same task thread. In rare cases, committer and writer are not chained, so 
writer and committer
+ * are in different tasks and threads. However, because of colocations of 
tasks, we still know that
+ * both instances will run inside the same JVM and we can establish a 
backchannel between them. The
+ * latter case requires some synchronization in the buffer.
+ *
+ * <p>Messages can be sent before the backchannel is established. They will be 
consumed once the
+ * backchannel is established.
+ */
+public interface Backchannel<T> extends Closeable {
+    /**
+     * Send a message to the other side of the backchannel. Currently, this is 
the transaction id of
+     * a committed transaction.
+     */
+    void send(T message);
+
+    /**
+     * Consume messages from the other side of the backchannel. This is used 
to signal that certain
+     * transactions have been committed, so that the writer can recycle the 
producer.
+     */
+    void consume(Consumer<T> consumer);

Review Comment:
   I find this method a bit confusing
   
   - The doc string states `Consume messages from the other side` but the 
method doesn't return anything only takes an argument.
   - Isn't this method also "sending" something in this case the `Consumer`? 
Why do we need send and consume?
   - Please also clarify that the backchannel does not store the Consumers e.g. 
if there are no messages you need to call consume again that the `Consumer` is 
triggered



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.IOUtils.closeAll;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Exactly-once Kafka writer that writes records to Kafka in transactions.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
+    private final String transactionalIdPrefix;
+
+    private final KafkaWriterState kafkaWriterState;
+    // producer pool only used for exactly once
+    private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> 
producerPool =
+            new ArrayDeque<>();
+    private final Collection<KafkaWriterState> recoveredStates;
+    private long lastCheckpointId;
+
+    private final Deque<Closeable> producerCloseables = new ArrayDeque<>();
+
+    /**
+     * Constructor creating a kafka writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link
+     * 
KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext,
+     * KafkaRecordSerializationSchema.KafkaSinkContext)} fails.
+     *
+     * @param deliveryGuarantee the Sink's delivery guarantee
+     * @param kafkaProducerConfig the properties to configure the {@link 
FlinkKafkaInternalProducer}
+     * @param transactionalIdPrefix used to create the transactionalIds
+     * @param sinkInitContext context to provide information about the runtime 
environment
+     * @param recordSerializer serialize to transform the incoming records to 
{@link ProducerRecord}
+     * @param schemaContext context used to initialize the {@link 
KafkaRecordSerializationSchema}
+     * @param recoveredStates state from an previous execution which was 
covered
+     */
+    ExactlyOnceKafkaWriter(
+            DeliveryGuarantee deliveryGuarantee,
+            Properties kafkaProducerConfig,
+            String transactionalIdPrefix,
+            Sink.InitContext sinkInitContext,
+            KafkaRecordSerializationSchema<IN> recordSerializer,
+            SerializationSchema.InitializationContext schemaContext,
+            Collection<KafkaWriterState> recoveredStates) {
+        super(
+                deliveryGuarantee,
+                kafkaProducerConfig,
+                sinkInitContext,
+                recordSerializer,
+                schemaContext);
+        this.transactionalIdPrefix =
+                checkNotNull(transactionalIdPrefix, "transactionalIdPrefix 
must not be null");
+
+        try {
+            recordSerializer.open(schemaContext, kafkaSinkContext);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Cannot initialize schema.", e);
+        }
+
+        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
+        this.lastCheckpointId =
+                sinkInitContext
+                        .getRestoredCheckpointId()
+                        .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
+            throw new UnsupportedOperationException(
+                    "Unsupported Kafka writer semantic " + deliveryGuarantee);
+        }
+
+        this.recoveredStates = checkNotNull(recoveredStates, 
"recoveredStates");
+        initFlinkMetrics();
+    }
+
+    @Override
+    public void initialize() {
+        abortLingeringTransactions(recoveredStates, lastCheckpointId + 1);
+        this.currentProducer = getTransactionalProducer(lastCheckpointId + 1);
+        this.currentProducer.beginTransaction();
+    }
+
+    @Override
+    public Collection<KafkaCommittable> prepareCommit() {
+        // only return a KafkaCommittable if the current transaction has been 
written some data
+        if (currentProducer.hasRecordsInTransaction()) {
+            final List<KafkaCommittable> committables =
+                    Collections.singletonList(
+                            KafkaCommittable.of(currentProducer, 
producerPool::add));
+            LOG.debug("Committing {} committables.", committables);
+            return committables;
+        }
+
+        // otherwise, we commit the empty transaction as is (no-op) and just 
recycle the producer
+        currentProducer.commitTransaction();
+        producerPool.add(currentProducer);
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<KafkaWriterState> snapshotState(long checkpointId) throws 
IOException {
+        currentProducer = getTransactionalProducer(checkpointId + 1);
+        currentProducer.beginTransaction();
+        return Collections.singletonList(kafkaWriterState);
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeAll(
+                this::abortCurrentProducer,
+                () -> closeAll(producerPool),
+                () -> closeAll(producerCloseables.stream()),
+                super::close);
+    }
+
+    private void abortCurrentProducer() {
+        // only abort if the transaction is known to the broker (needs to have 
at least one record
+        // sent)
+        if (currentProducer.isInTransaction() && 
currentProducer.hasRecordsInTransaction()) {
+            try {
+                currentProducer.abortTransaction();
+            } catch (ProducerFencedException e) {
+                LOG.debug(
+                        "Producer {} fenced while aborting", 
currentProducer.getTransactionalId());
+            }
+        }
+    }
+
+    Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducerPool() {
+        return producerPool;
+    }
+
+    void abortLingeringTransactions(

Review Comment:
   Nit: this can be private



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A backchannel for communication between the Kafka committer -> writer. It's 
used to recycle
+ * producer and signal that certain transactions have been committed on 
recovery.
+ */
+@Internal
+@ThreadSafe
+public class BackchannelImpl<T> implements Backchannel<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackchannelImpl.class);
+
+    /**
+     * The map of backchannels, keyed by the subtask id and transactional id 
prefix to uniquely
+     * identify the backchannel while establishing the connection.
+     */
+    @GuardedBy("this")

Review Comment:
   I usually use `GuardedBy` to signal I am using the parameter as object lock. 
In this case I do not see `this` as a monitor object.
   
   Also, using `this` (an object reference?) as lock for a stick member is 
strange.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.Closeable;
+import java.util.function.Consumer;
+
+/**
+ * A backchannel for communication between the commiter -> writer. It's used 
to signal that certain
+ * transactions have been committed and respective producers are good to be 
reused.
+ *
+ * <p>The model closely follows the idea of statefun except that there is no 
need to checkpoint the
+ * state since the backchannel will fully recover on restart from the 
committer state.
+ *
+ * <p>Establishing a backchannel for Kafka sink works because there is only 
writer and committer and
+ * nothing in between these two operators. In most cases, these two are 
chained in live inside the
+ * same task thread. In rare cases, committer and writer are not chained, so 
writer and committer
+ * are in different tasks and threads. However, because of colocations of 
tasks, we still know that
+ * both instances will run inside the same JVM and we can establish a 
backchannel between them. The
+ * latter case requires some synchronization in the buffer.
+ *
+ * <p>Messages can be sent before the backchannel is established. They will be 
consumed once the
+ * backchannel is established.
+ */
+public interface Backchannel<T> extends Closeable {
+    /**
+     * Send a message to the other side of the backchannel. Currently, this is 
the transaction id of
+     * a committed transaction.
+     */
+    void send(T message);
+
+    /**
+     * Consume messages from the other side of the backchannel. This is used 
to signal that certain
+     * transactions have been committed, so that the writer can recycle the 
producer.
+     */
+    void consume(Consumer<T> consumer);
+
+    /** Forcefully establishes the backchannel. This is used for tests. */
+    @VisibleForTesting
+    BackchannelImpl<T> establish();

Review Comment:
   Why can't we instantiate the `BackchannelImpl` as part of the tests instead 
of having it in the interface?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.Closeable;
+import java.util.function.Consumer;
+
+/**
+ * A backchannel for communication between the commiter -> writer. It's used 
to signal that certain
+ * transactions have been committed and respective producers are good to be 
reused.
+ *
+ * <p>The model closely follows the idea of statefun except that there is no 
need to checkpoint the
+ * state since the backchannel will fully recover on restart from the 
committer state.
+ *
+ * <p>Establishing a backchannel for Kafka sink works because there is only 
writer and committer and
+ * nothing in between these two operators. In most cases, these two are 
chained in live inside the
+ * same task thread. In rare cases, committer and writer are not chained, so 
writer and committer
+ * are in different tasks and threads. However, because of colocations of 
tasks, we still know that

Review Comment:
   What exactly is meant by task colocation? Is it always ensured that 
operators connected by a forward pass are part of the same task/taskmanager?
   
   Ideally, we link a code piece to public documentation. Otherwise, I am 
afraid it may break unnoticed.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A backchannel for communication between the Kafka committer -> writer. It's 
used to recycle
+ * producer and signal that certain transactions have been committed on 
recovery.
+ */
+@Internal
+@ThreadSafe
+public class BackchannelImpl<T> implements Backchannel<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackchannelImpl.class);
+
+    /**
+     * The map of backchannels, keyed by the subtask id and transactional id 
prefix to uniquely
+     * identify the backchannel while establishing the connection.
+     */
+    @GuardedBy("this")
+    private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> 
BACKCHANNELS =
+            new HashMap<>();
+    /** The key to identify the backchannel (subtask id, transactional id 
prefix). */
+    private final Tuple2<Integer, String> key;
+    /**
+     * The writer thread that sends messages to the committer. Used to ensure 
that we switch to a
+     * concurrent queue when the writer and committer are not chained.
+     */
+    private Thread writerThread;
+    /**
+     * The committer thread that consumes messages from the writer. Used to 
ensure that we switch to
+     * a concurrent queue when the writer and committer are not chained.
+     */
+    private Thread committerThread;
+    /**
+     * The messages to be sent from the writer to the committer. By default, 
it's a thread-safe
+     * implementation but it can be replaced with an unsynchronized queue when 
the writer and
+     * committer are chained (common case).
+     */
+    private Deque<T> messages = new ConcurrentLinkedDeque<>();
+    /** Flag to indicate that the backchannel has been established. */
+    private boolean established;
+
+    BackchannelImpl(Tuple2<Integer, String> key) {
+        this.key = key;
+    }
+
+    @Override
+    public void send(T message) {
+        messages.add(message);
+    }
+
+    @Override
+    public void consume(Consumer<T> consumer) {
+        ensureEstablished();
+        T message;
+        while ((message = pollUnsafe()) != null) {
+            consumer.accept(message);
+        }
+    }
+
+    @Nullable
+    private T pollUnsafe() {
+        return messages.poll();
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public T poll() {
+        ensureEstablished();
+        return pollUnsafe();
+    }
+
+    @VisibleForTesting
+    @Override
+    public BackchannelImpl<T> establish() {
+        return create(key.f0, key.f1, writerThread == null);
+    }
+
+    /**
+     * Create a backchannel for the given subtask id and transactional id 
prefix. The backchannel is
+     * created if it does not exist, otherwise, the existing backchannel is 
returned.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix but
+     * different forWriter values, the backchannel will be established and can 
be used for
+     * communication.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix and
+     * the same forWriter value, an error will be logged instead. There is a 
high chance that the
+     * backchannel is misused by using non-uniques transactionalIds.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> BackchannelImpl<T> create(
+            int subtaskId, String transactionalIdPrefix, boolean forWriter) {
+        Tuple2<Integer, String> key = new Tuple2<>(subtaskId, 
transactionalIdPrefix);
+        BackchannelImpl<T> backchannel;
+        boolean duplicate;
+        synchronized (BACKCHANNELS) {
+            backchannel =
+                    (BackchannelImpl<T>)
+                            BACKCHANNELS.computeIfAbsent(key, k -> new 
BackchannelImpl<>(key));
+            duplicate = backchannel.isDuplicate(forWriter);
+            backchannel.register(forWriter);
+        }
+        if (duplicate) {
+            LOG.error(

Review Comment:
   Is it intentionally that the logging happens outside the critical section? 
It makes reading the code a lot hard.
   
   Can we do
   ```java
   if (backchannel.isDuplicate(forWriter)) {
     LOG.error(...)
   }
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A backchannel for communication between the Kafka committer -> writer. It's 
used to recycle
+ * producer and signal that certain transactions have been committed on 
recovery.
+ */
+@Internal
+@ThreadSafe
+public class BackchannelImpl<T> implements Backchannel<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackchannelImpl.class);
+
+    /**
+     * The map of backchannels, keyed by the subtask id and transactional id 
prefix to uniquely
+     * identify the backchannel while establishing the connection.
+     */
+    @GuardedBy("this")
+    private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> 
BACKCHANNELS =
+            new HashMap<>();
+    /** The key to identify the backchannel (subtask id, transactional id 
prefix). */
+    private final Tuple2<Integer, String> key;
+    /**
+     * The writer thread that sends messages to the committer. Used to ensure 
that we switch to a
+     * concurrent queue when the writer and committer are not chained.
+     */
+    private Thread writerThread;
+    /**
+     * The committer thread that consumes messages from the writer. Used to 
ensure that we switch to
+     * a concurrent queue when the writer and committer are not chained.
+     */
+    private Thread committerThread;
+    /**
+     * The messages to be sent from the writer to the committer. By default, 
it's a thread-safe
+     * implementation but it can be replaced with an unsynchronized queue when 
the writer and
+     * committer are chained (common case).
+     */
+    private Deque<T> messages = new ConcurrentLinkedDeque<>();
+    /** Flag to indicate that the backchannel has been established. */
+    private boolean established;
+
+    BackchannelImpl(Tuple2<Integer, String> key) {
+        this.key = key;
+    }
+
+    @Override
+    public void send(T message) {
+        messages.add(message);
+    }
+
+    @Override
+    public void consume(Consumer<T> consumer) {
+        ensureEstablished();
+        T message;
+        while ((message = pollUnsafe()) != null) {
+            consumer.accept(message);
+        }
+    }
+
+    @Nullable
+    private T pollUnsafe() {
+        return messages.poll();
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public T poll() {
+        ensureEstablished();
+        return pollUnsafe();
+    }
+
+    @VisibleForTesting
+    @Override
+    public BackchannelImpl<T> establish() {
+        return create(key.f0, key.f1, writerThread == null);
+    }
+
+    /**
+     * Create a backchannel for the given subtask id and transactional id 
prefix. The backchannel is
+     * created if it does not exist, otherwise, the existing backchannel is 
returned.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix but
+     * different forWriter values, the backchannel will be established and can 
be used for
+     * communication.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix and
+     * the same forWriter value, an error will be logged instead. There is a 
high chance that the
+     * backchannel is misused by using non-uniques transactionalIds.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> BackchannelImpl<T> create(
+            int subtaskId, String transactionalIdPrefix, boolean forWriter) {
+        Tuple2<Integer, String> key = new Tuple2<>(subtaskId, 
transactionalIdPrefix);
+        BackchannelImpl<T> backchannel;
+        boolean duplicate;
+        synchronized (BACKCHANNELS) {
+            backchannel =
+                    (BackchannelImpl<T>)
+                            BACKCHANNELS.computeIfAbsent(key, k -> new 
BackchannelImpl<>(key));
+            duplicate = backchannel.isDuplicate(forWriter);
+            backchannel.register(forWriter);
+        }
+        if (duplicate) {
+            LOG.error(
+                    "Found duplicate transactionalIdPrefix for multiple Kafka 
sinks: {}. Transactional id prefixes need to be unique. You may experience 
memory leaks without fixing this.",
+                    transactionalIdPrefix);
+        }
+        return backchannel;
+    }
+
+    private boolean isDuplicate(boolean forWriter) {
+        return forWriter ? writerThread != null : committerThread != null;
+    }
+
+    private void ensureEstablished() {
+        if (established) {
+            return;
+        }
+
+        // Most of the fields are only properly initialized by the second side 
of register (writer
+        // or committer).
+        // This method briefly enters the critical section to ensure that all 
fields are visible to
+        // the first side.
+        boolean established;
+        synchronized (BACKCHANNELS) {
+            established = this.established;
+        }
+
+        checkState(established, "Backchannel not established for %s", key);
+    }
+
+    /**
+     * Register the current thread as either writer or committer. If both 
threads are registered,
+     * the backchannel is established.
+     *
+     * <p>This method must be called in a critical section.
+     */
+    private void register(boolean forWriter) {
+        boolean complete;
+        if (forWriter) {
+            complete = committerThread != null;
+            writerThread = Thread.currentThread();
+        } else {
+            complete = writerThread != null;
+            committerThread = Thread.currentThread();
+        }
+        if (complete) {
+            // in the case where writer and committer are chained (common 
case), we can switch to a
+            // non-thread-safe queue.
+            if (writerThread == committerThread) {
+                messages = new ArrayDeque<>(messages);
+            }
+            established = true;
+            LOG.debug("Backchannel established for {}", key);
+        }
+    }
+
+    @Override
+    public void close() {
+        BACKCHANNELS.remove(key);

Review Comment:
   Missing the synchronized block?



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -276,8 +277,9 @@ private void triggerProducerException(KafkaWriter<Integer> 
writer, Properties pr
                 new FlinkKafkaInternalProducer<>(properties, transactionalId)) 
{
             producer.initTransactions();
             producer.beginTransaction();
-            producer.send(new ProducerRecord<byte[], byte[]>(topic, 
"1".getBytes()));
+            producer.send(new ProducerRecord<>(topic, "1".getBytes()));
             producer.commitTransaction();
+            producer.flush();

Review Comment:
   This change looks unrelated to the commit and the refactoring of splitting 
the writer.



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImplTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+class BackchannelImplTest {

Review Comment:
   Let's also add a test with multiple consumes where the first `Consumer` is 
"discarded".



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/BackchannelImpl.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A backchannel for communication between the Kafka committer -> writer. It's 
used to recycle
+ * producer and signal that certain transactions have been committed on 
recovery.
+ */
+@Internal
+@ThreadSafe
+public class BackchannelImpl<T> implements Backchannel<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BackchannelImpl.class);
+
+    /**
+     * The map of backchannels, keyed by the subtask id and transactional id 
prefix to uniquely
+     * identify the backchannel while establishing the connection.
+     */
+    @GuardedBy("this")
+    private static final Map<Tuple2<Integer, String>, BackchannelImpl<?>> 
BACKCHANNELS =
+            new HashMap<>();
+    /** The key to identify the backchannel (subtask id, transactional id 
prefix). */
+    private final Tuple2<Integer, String> key;
+    /**
+     * The writer thread that sends messages to the committer. Used to ensure 
that we switch to a
+     * concurrent queue when the writer and committer are not chained.
+     */
+    private Thread writerThread;
+    /**
+     * The committer thread that consumes messages from the writer. Used to 
ensure that we switch to
+     * a concurrent queue when the writer and committer are not chained.
+     */
+    private Thread committerThread;
+    /**
+     * The messages to be sent from the writer to the committer. By default, 
it's a thread-safe
+     * implementation but it can be replaced with an unsynchronized queue when 
the writer and
+     * committer are chained (common case).
+     */
+    private Deque<T> messages = new ConcurrentLinkedDeque<>();
+    /** Flag to indicate that the backchannel has been established. */
+    private boolean established;
+
+    BackchannelImpl(Tuple2<Integer, String> key) {
+        this.key = key;
+    }
+
+    @Override
+    public void send(T message) {
+        messages.add(message);
+    }
+
+    @Override
+    public void consume(Consumer<T> consumer) {
+        ensureEstablished();
+        T message;
+        while ((message = pollUnsafe()) != null) {
+            consumer.accept(message);
+        }
+    }
+
+    @Nullable
+    private T pollUnsafe() {
+        return messages.poll();
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public T poll() {
+        ensureEstablished();
+        return pollUnsafe();
+    }
+
+    @VisibleForTesting
+    @Override
+    public BackchannelImpl<T> establish() {
+        return create(key.f0, key.f1, writerThread == null);
+    }
+
+    /**
+     * Create a backchannel for the given subtask id and transactional id 
prefix. The backchannel is
+     * created if it does not exist, otherwise, the existing backchannel is 
returned.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix but
+     * different forWriter values, the backchannel will be established and can 
be used for
+     * communication.
+     *
+     * <p>If this method is called twice with the same subtask id and 
transactional id prefix and
+     * the same forWriter value, an error will be logged instead. There is a 
high chance that the
+     * backchannel is misused by using non-uniques transactionalIds.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> BackchannelImpl<T> create(
+            int subtaskId, String transactionalIdPrefix, boolean forWriter) {
+        Tuple2<Integer, String> key = new Tuple2<>(subtaskId, 
transactionalIdPrefix);
+        BackchannelImpl<T> backchannel;
+        boolean duplicate;
+        synchronized (BACKCHANNELS) {
+            backchannel =
+                    (BackchannelImpl<T>)
+                            BACKCHANNELS.computeIfAbsent(key, k -> new 
BackchannelImpl<>(key));
+            duplicate = backchannel.isDuplicate(forWriter);
+            backchannel.register(forWriter);
+        }
+        if (duplicate) {
+            LOG.error(
+                    "Found duplicate transactionalIdPrefix for multiple Kafka 
sinks: {}. Transactional id prefixes need to be unique. You may experience 
memory leaks without fixing this.",
+                    transactionalIdPrefix);
+        }
+        return backchannel;
+    }
+
+    private boolean isDuplicate(boolean forWriter) {
+        return forWriter ? writerThread != null : committerThread != null;
+    }
+
+    private void ensureEstablished() {
+        if (established) {
+            return;
+        }
+
+        // Most of the fields are only properly initialized by the second side 
of register (writer
+        // or committer).
+        // This method briefly enters the critical section to ensure that all 
fields are visible to
+        // the first side.
+        boolean established;
+        synchronized (BACKCHANNELS) {
+            established = this.established;
+        }

Review Comment:
   Using an AtomicBoolean seems cleaner here since it should be a one-time 
switch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to