rhauch commented on a change in pull request #8720:
URL: https://github.com/apache/kafka/pull/8720#discussion_r431212650



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link 
SinkTask#put(Collection<SinkRecord>)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+  /**
+   * Report a problematic record and the corresponding error to be written to 
the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link 
java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future 
will block until the
+   * record has been written or throw any exception that occurred while 
sending the record.
+   * If you want to simulate a simple blocking call you can call the 
<code>get()</code> method
+   * immediately.
+   *
+   * Connect guarantees that sink records reported through this reporter will 
be written to the error topic
+   * before the framework calls the {@link SinkTask#preCommit(Map)} method and 
therefore before

Review comment:
       You need to qualify `Map` or import it:
   ```suggestion
      * before the framework calls the {@link 
SinkTask#preCommit(java.util.Map)} method and therefore before
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link 
SinkTask#put(Collection<SinkRecord>)}.

Review comment:
       This is not legal JavaDoc:
   ```suggestion
    * Component that the sink task can use as it {@link 
SinkTask#put(java.util.Collection)}.
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;

Review comment:
       When you move `InternalSinkRecord` to the `runtime` module, be sure to 
make this `private final`.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -68,6 +70,14 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
         return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
     }
 
+    public InternalSinkRecord newRecord(String topic, Integer kafkaPartition, 
Schema keySchema, Object key, Schema valueSchema, Object value,
+                                        long kafkaOffset, Long timestamp,
+                                        TimestampType timestampType, 
Iterable<Header> headers,
+                                        ConsumerRecord<byte[], byte[]> 
originalRecord) {
+        return new InternalSinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value,
+            kafkaOffset, timestamp, timestampType, headers, originalRecord);
+    }
+

Review comment:
       You can't change this public API. `InternalSinkRecord` needs to be in 
the `runtime` module.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = 
origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), 
origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), 
origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), 
origRecord.headers(), msg);
+
         log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, 
keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
-            recordActiveTopic(origRecord.topic());
+            recordActiveTopic(internalSinkRecord.topic());
         }
-        return transformationChain.apply(origRecord);
+        return transformationChain.apply(internalSinkRecord);

Review comment:
       This line would change to:
   ```suggestion
           // Apply the transformations
           SinkRecord transformedRecord = transformationChain.apply(sinkRecord);
           if (transformedRecord == null) {
               // The record is being dropped
               return null;
           }
           // Error reporting will need to correlate each sink record with the 
original consumer record
           return new InternalSinkRecord(msg, transformedRecord);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -176,6 +182,43 @@ void populateContextHeaders(ProducerRecord<byte[], byte[]> 
producerRecord, Proce
         }
     }
 
+    public static KafkaProducer<byte[], byte[]> setUpTopicAndProducer(
+        Map<String, Object> adminProps,
+        Map<String, Object> producerProps,
+        SinkConnectorConfig sinkConnectorConfig,
+        int dlqTopicNumPartitions
+    ) {
+        String dlqTopic = sinkConnectorConfig.dlqTopicName();
+
+        if (dlqTopic != null && !dlqTopic.isEmpty()) {
+            try (Admin admin = Admin.create(adminProps)) {
+                if (!admin.listTopics().names().get().contains(dlqTopic)) {
+                    log.error("Topic {} doesn't exist. Will attempt to create 
topic.", dlqTopic);
+                    NewTopic schemaTopicRequest = new NewTopic(
+                        dlqTopic,
+                        dlqTopicNumPartitions,
+                        sinkConnectorConfig.dlqTopicReplicationFactor()
+                    );
+                    
admin.createTopics(singleton(schemaTopicRequest)).all().get();
+                }
+            } catch (InterruptedException e) {
+                throw new ConnectException(
+                    "Could not initialize errant record reporter with topic = 
" + dlqTopic,
+                    e
+                );
+            } catch (ExecutionException e) {
+                if (!(e.getCause() instanceof TopicExistsException)) {
+                    throw new ConnectException(
+                        "Could not initialize errant record reporter with 
topic = " + dlqTopic,
+                        e
+                    );
+                }
+            }
+            return new KafkaProducer<>(producerProps);
+        }
+        return null;
+    }
+

Review comment:
       I think we don't really need to make this change anymore, since it's 
only refactoring the existing code that we don't need to actually change 
anymore. (This PR is no longer using this logic in multiple places.)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java
##########
@@ -139,6 +145,18 @@ public void report() {
         }
     }
 
+    public Future<Void> report(Callback callback) {
+        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        for (ErrorReporter reporter: reporters) {
+            Future<RecordMetadata> future = reporter.report(this, callback);
+            if (!future.isDone()) {
+                futures.add(future);
+            }
+        }
+        return new ErrantRecordFuture(futures);

Review comment:
       Since we often have just one reporter, it is probably worth avoiding the 
unnecessary allocations:
   ```suggestion
           if (reporters.size() == 1) {
               return reporters.get(0).report(this);
           }
           List<Future<RecordMetadata>> futures = new LinkedList<>();
           for (ErrorReporter reporter: reporters) {
               Future<RecordMetadata> future = reporter.report(this, callback);
               if (!future.isDone()) {
                   futures.add(future);
               }
           }
           if (futures.isEmpty()) {
               return CompletableFuture.completedFuture(null);
           }
           return new ErrantRecordFuture(futures);
   ```
   And since we don't know how many futures we'll add to the list (and it will 
likely be just zero if the DLQ is not configured or just one for the DLQ), 
let's use a `LinkedList` instead to avoid excessive allocation when adding the 
first element to the `ArrayList`.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to 
Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) 
sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), 
record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), 
record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the 
errant
+     * record reporter. This function is intended to be used to block on all 
the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the 
record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+

Review comment:
       Let's avoid unnecessary blank lines.
   ```suggestion
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.concurrent.Future;
+
+/**
+ * Component that the sink task can use as it {@link 
SinkTask#put(Collection<SinkRecord>)}.
+ * Reporter of problematic records and the corresponding problems.
+ *
+ * @since 2.6
+ */
+public interface ErrantRecordReporter {
+
+  /**
+   * Report a problematic record and the corresponding error to be written to 
the sink
+   * connector's dead letter queue (DLQ).
+   *
+   * <p>This call is asynchronous and returns a {@link 
java.util.concurrent.Future Future}.
+   * Invoking {@link java.util.concurrent.Future#get() get()} on this future 
will block until the
+   * record has been written or throw any exception that occurred while 
sending the record.
+   * If you want to simulate a simple blocking call you can call the 
<code>get()</code> method
+   * immediately.
+   *
+   * Connect guarantees that sink records reported through this reporter will 
be written to the error topic
+   * before the framework calls the {@link SinkTask#preCommit(Map)} method and 
therefore before
+   * committing the consumer offsets. SinkTask implementations can use the 
Future when stronger guarantees
+   * are required.
+   *
+   * @param record the problematic record; may not be null
+   * @param error  the error capturing the problem with the record; may not be 
null
+   * @return a future that can be used to block until the record and error are 
reported
+   *         to the DLQ
+   * @throws ConnectException if the error reporter and DLQ fails to write a 
reported record

Review comment:
       You need to qualify `ConnectException` or import it. The latter is 
probably better in this case to make the JavaDoc more readable in the code.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = 
origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), 
origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), 
origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), 
origRecord.headers(), msg);
+

Review comment:
       Let's not create the `InternalSinkRecord` until *after* the 
transformation chain has been applied. That way we're not affected by any SMT 
that creates a new `SinkRecord` via a constructor (where we'd lose our 
`InternalSinkRecord`) rather than `newRecord(...)` (where we'd keep the 
`InternalSinkRecord`).
   ```suggestion
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;
+
+        public InternalSinkRecord(String topic, int partition, Schema 
keySchema, Object key,
+                                  Schema valueSchema, Object value, long 
kafkaOffset,
+                                  Long timestamp, TimestampType timestampType,
+                                  Iterable<Header> headers,
+                                  ConsumerRecord<byte[], byte[]> 
originalRecord) {
+            super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp,
+                timestampType, headers);
+            this.originalRecord = originalRecord;
+

Review comment:
       First of all, let's avoid adding unnecessary blank lines.
   
   Second, when you move this to `runtime`, you won't need to use this 
constructor and instead could use a much more straightforward one:
   ```
            public InternalSinkRecord(ConsumerRecord<byte[], byte[]> 
originalRecord, SinkRecord record) {
               super(record.topic(), record.kafkaPartition(), 
record.keySchema(), record.key(),
                       record.valueSchema(), record.value(), 
record.kafkaOffset(), record.timestamp(),
                       record.timestampType(), record.headers());
               this.originalRecord = Objects.requireNonNull(originalRecord);
           }
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
##########
@@ -95,4 +95,30 @@
      */
     void requestCommit();
 
+    /**
+     * Get the reporter to which the sink task can report problematic or 
failed {@link SinkRecord records}
+     * passed to the {@link SinkTask#put(Collection)} method. When reporting a 
failed record,

Review comment:
       This is invalid:
   ```suggestion
        * passed to the {@link SinkTask#put(java.util.Collection)} method. When 
reporting a failed record,
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to 
Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;

Review comment:
       Why use a function here? We can use a simple variable here.
   
   (I suggested a function offline to avoid having to pass in the converters. 
But passing in the converters into this class encapsulates this logic nicely.)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. 
topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a 
custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record 
is sent to Kafka.
+     * @return

Review comment:
       Missing JavaDoc details:
   ```suggestion
        * @return the future associated with the writing of this record; never 
null
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
##########
@@ -100,4 +110,29 @@ public String toString() {
                 ", timestampType=" + timestampType +
                 "} " + super.toString();
     }
+
+    public class InternalSinkRecord extends SinkRecord {
+
+        ConsumerRecord<byte[], byte[]> originalRecord;
+
+        public InternalSinkRecord(String topic, int partition, Schema 
keySchema, Object key,
+                                  Schema valueSchema, Object value, long 
kafkaOffset,
+                                  Long timestamp, TimestampType timestampType,
+                                  Iterable<Header> headers,
+                                  ConsumerRecord<byte[], byte[]> 
originalRecord) {
+            super(topic, partition, keySchema, key, valueSchema, value, 
kafkaOffset, timestamp,
+                timestampType, headers);
+            this.originalRecord = originalRecord;
+
+        }
+
+        /**
+         *
+         * @return the original consumer record that was converted to this 
sink record.
+         */

Review comment:
       If we're going to add JavaDoc, which I think is helpful, then make it 
complete by adding a description:
   ```suggestion
           /**
            * Return the original consumer record that this sink record 
represents.
            *
            * @return the original consumer record; never null
            */
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -552,14 +553,18 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
             TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig,
+                errorHandlingMetrics, connectorClass));

Review comment:
       I don't think this line was actually changed other than formatting. 
Please remove it to avoid changing lines we don't have to.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
##########
@@ -95,4 +95,30 @@
      */
     void requestCommit();
 
+    /**
+     * Get the reporter to which the sink task can report problematic or 
failed {@link SinkRecord records}
+     * passed to the {@link SinkTask#put(Collection)} method. When reporting a 
failed record,
+     * the sink task will receive a {@link Future} that the task can 
optionally use to wait until

Review comment:
       This is invalid in JavaDoc:
   ```suggestion
        * the sink task will receive a {@link java.util.concurrent.Future} that 
the task can optionally use to wait until
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
                 timestamp,
                 msg.timestampType(),
                 headers);
+
+        InternalSinkRecord internalSinkRecord = 
origRecord.newRecord(origRecord.topic(),
+            origRecord.kafkaPartition(), origRecord.keySchema(), 
origRecord.key(),
+            origRecord.valueSchema(), origRecord.value(), 
origRecord.kafkaOffset(),
+            origRecord.timestamp(), origRecord.timestampType(), 
origRecord.headers(), msg);
+
         log.trace("{} Applying transformations to record in topic '{}' 
partition {} at offset {} and timestamp {} with key {} and value {}",
                 this, msg.topic(), msg.partition(), msg.offset(), timestamp, 
keyAndSchema.value(), valueAndSchema.value());
         if (isTopicTrackingEnabled) {
-            recordActiveTopic(origRecord.topic());
+            recordActiveTopic(internalSinkRecord.topic());

Review comment:
       This line would not need to be affected.
   ```suggestion
               recordActiveTopic(sinkRecord.topic());
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -67,30 +72,22 @@
     private final SinkConnectorConfig connConfig;
     private final ConnectorTaskId connectorTaskId;
     private final ErrorHandlingMetrics errorHandlingMetrics;
+    private final String dlqTopicName;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
 
     public static DeadLetterQueueReporter createAndSetup(Map<String, Object> 
adminProps,
                                                          ConnectorTaskId id,
                                                          SinkConnectorConfig 
sinkConfig, Map<String, Object> producerProps,
                                                          ErrorHandlingMetrics 
errorHandlingMetrics) {
-        String topic = sinkConfig.dlqTopicName();
 
-        try (Admin admin = Admin.create(adminProps)) {
-            if (!admin.listTopics().names().get().contains(topic)) {
-                log.error("Topic {} doesn't exist. Will attempt to create 
topic.", topic);
-                NewTopic schemaTopicRequest = new NewTopic(topic, 
DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
-                admin.createTopics(singleton(schemaTopicRequest)).all().get();
-            }
-        } catch (InterruptedException e) {
-            throw new ConnectException("Could not initialize dead letter queue 
with topic=" + topic, e);
-        } catch (ExecutionException e) {
-            if (!(e.getCause() instanceof TopicExistsException)) {
-                throw new ConnectException("Could not initialize dead letter 
queue with topic=" + topic, e);
-            }
-        }
 
-        KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
+        KafkaProducer<byte[], byte[]> dlqProducer = setUpTopicAndProducer(
+            adminProps,
+            producerProps,
+            sinkConfig,
+            DLQ_NUM_DESIRED_PARTITIONS
+        );

Review comment:
       We don't need to make this change, do we? Let's try to minimize the 
changes to the existing code.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -83,6 +87,17 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
         this.time = time;
     }
 
+    public Future<Void> executeFailed(Function<SinkRecord, 
ConsumerRecord<byte[], byte[]>> function,
+                                      Stage stage, Class<?> executingClass, 
SinkRecord record,
+                                      Throwable error, Callback callback) {
+

Review comment:
       No new line is needed here.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. 
topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a 
custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record 
is sent to Kafka.
+     * @return
+     */
+    public Future<RecordMetadata> report(ProcessingContext context, Callback 
callback) {
         if (dlqTopicName.isEmpty()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
         ConsumerRecord<byte[], byte[]> originalMessage = 
context.consumerRecord();
         if (originalMessage == null) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
-        ProducerRecord<byte[], byte[]> producerRecord;
-        if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) {
-            producerRecord = new ProducerRecord<>(dlqTopicName, null,
-                    originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
-        } else {
-            producerRecord = new ProducerRecord<>(dlqTopicName, null, 
originalMessage.timestamp(),
-                    originalMessage.key(), originalMessage.value(), 
originalMessage.headers());
-        }
+        ProducerRecord<byte[], byte[]> producerRecord =
+            new ProducerRecord<>(dlqTopicName, null,
+                originalMessage.timestamp() != RecordBatch.NO_TIMESTAMP ?
+                    originalMessage.timestamp() : null,
+                originalMessage.key(), originalMessage.value(), 
originalMessage.headers());

Review comment:
       It'd be better to not change these lines, because we don't intend to 
change the logic -- yet doing so adds risk and increases the size of this PR.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
##########
@@ -119,38 +117,46 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
      * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
      */
     public void report(ProcessingContext context) {
-        final String dlqTopicName = connConfig.dlqTopicName();
+        Callback callback = (metadata, exception) -> {
+            if (exception != null) {
+                log.error("Could not produce message to dead letter queue. 
topic=" + dlqTopicName, exception);
+                errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
+            }
+        };
+        report(context, callback);
+    }
+
+    /**
+     * Write the raw records into a Kafka topic. This methods allows for a 
custom callback to be
+     * passed to the producer.
+     *
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     * @param callback callback to be invoked by the producer when the record 
is sent to Kafka.
+     * @return

Review comment:
       Why do we need to overload this method to pass in a callback? The only 
place we're using this new method is via the reporter, and 
`WorkerErrantRecordReporter.callback` doesn't seem to provide any value and in 
fact is not able to call 
`errorHandlingMetrics.recordDeadLetterQueueProduceFailed()` like this class.
   
   Wouldn't it be much simpler to just add a return type to the existing method?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java
##########
@@ -28,6 +34,18 @@
      */
     void report(ProcessingContext context);
 
+    /**
+     * Report an error with a specified callback.
+     *
+     * @param context the processing context (cannot be null).
+     * @param callback callback to be invoked by a producer when sending a 
record to Kafka.
+     * @return future result from the producer sending a record to Kafka
+     */
+    default Future<RecordMetadata> report(ProcessingContext context, Callback 
callback) {
+        report(context);
+        return CompletableFuture.completedFuture(null);
+    }
+

Review comment:
       I don't think we need to overload this method, and instead we can just 
change the return type. After all, the `ErrorReporter` is not part of the 
public API, and is merely an abstraction we use within the runtime itself.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to 
Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) 
sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), 
record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), 
record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);

Review comment:
       How about adding it only if the future is not done already?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord;
+
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+public class WorkerErrantRecordReporter implements ErrantRecordReporter {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
+
+    private final Callback callback = (metadata, exception) -> {
+        if (exception != null) {
+            throw new ConnectException("Failed to send the errant record to 
Kafka",
+                exception.getCause());
+        }
+    };
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    // Visible for testing
+    public LinkedList<Future<Void>> futures;
+
+    public WorkerErrantRecordReporter(
+        RetryWithToleranceOperator retryWithToleranceOperator,
+        Converter keyConverter,
+        Converter valueConverter,
+        HeaderConverter headerConverter
+    ) {
+        this.retryWithToleranceOperator = retryWithToleranceOperator;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
+        this.futures = new LinkedList<>();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord record, Throwable error) {
+        Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function;
+
+        if (record instanceof InternalSinkRecord) {
+            function = sinkRecord -> ((InternalSinkRecord) 
sinkRecord).originalRecord();
+        } else {
+            function = sinkRecord -> {
+
+                String topic = record.topic();
+                byte[] key = keyConverter.fromConnectData(topic, 
record.keySchema(), record.key());
+                byte[] value = valueConverter.fromConnectData(topic, 
record.valueSchema(),
+                    record.value());
+
+                RecordHeaders headers = new RecordHeaders();
+                if (record.headers() != null) {
+                    for (Header header : record.headers()) {
+                        String headerKey = header.key();
+                        byte[] rawHeader = 
headerConverter.fromConnectHeader(topic, headerKey,
+                            header.schema(), header.value());
+                        headers.add(headerKey, rawHeader);
+                    }
+                }
+
+                return new ConsumerRecord<>(record.topic(), 
record.kafkaPartition(),
+                    record.kafkaOffset(), record.timestamp(), 
record.timestampType(), -1L, -1,
+                    -1, key, value, headers);
+
+            };
+        }
+
+        Future<Void> future = 
retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT,
+            SinkTask.class, record, error, callback);
+
+        futures.add(future);
+        return future;
+    }
+
+    /**
+     * Gets all futures returned by the sink records sent to Kafka by the 
errant
+     * record reporter. This function is intended to be used to block on all 
the errant record
+     * futures.
+     */
+    public void getAllFutures() {
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Encountered an error while calling ");
+                throw new ConnectException(e);
+            }
+        }
+        futures.clear();
+    }
+
+    /**
+     * Wrapper class to aggregate producer futures and abstract away the 
record metadata from the
+     * Connect user.
+     */
+    public static class ErrantRecordFuture implements Future<Void> {
+
+        private final List<Future<RecordMetadata>> futures;
+
+
+        public ErrantRecordFuture(List<Future<RecordMetadata>> 
producerFutures) {
+            futures = producerFutures;
+        }

Review comment:
       Rather than have a list of futures, why not have a single `Future` 
delegate that is either a `CompletableFuture.allOf(...)` or a single feature? 
This makes the constructor a little more complex, but it would simplify all of 
the other methods tremendously since they merely have to delegate (except for 
`cancel()` and `isCancelled()`, which can stay the same:
   ```suggestion
           public ErrantRecordFuture(List<Future<RecordMetadata>> 
producerFutures) {
               if (producerFutures == null || producerFutures.isEmpty()) {
                   future = CompletableFuture.completedFuture(null);
               } else {
                   futures = CompletableFutures.allOf(producerFutures);
               }
           }
   ```
   This will make `get(long, TimeUnit)` behave more correctly by requiring that 
all futures complete within the stated time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to