rhauch commented on a change in pull request #8720: URL: https://github.com/apache/kafka/pull/8720#discussion_r431212650
########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import java.util.concurrent.Future; + +/** + * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}. + * Reporter of problematic records and the corresponding problems. + * + * @since 2.6 + */ +public interface ErrantRecordReporter { + + /** + * Report a problematic record and the corresponding error to be written to the sink + * connector's dead letter queue (DLQ). + * + * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}. + * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the + * record has been written or throw any exception that occurred while sending the record. + * If you want to simulate a simple blocking call you can call the <code>get()</code> method + * immediately. + * + * Connect guarantees that sink records reported through this reporter will be written to the error topic + * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before Review comment: You need to qualify `Map` or import it: ```suggestion * before the framework calls the {@link SinkTask#preCommit(java.util.Map)} method and therefore before ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import java.util.concurrent.Future; + +/** + * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}. Review comment: This is not legal JavaDoc: ```suggestion * Component that the sink task can use as it {@link SinkTask#put(java.util.Collection)}. ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java ########## @@ -100,4 +110,29 @@ public String toString() { ", timestampType=" + timestampType + "} " + super.toString(); } + + public class InternalSinkRecord extends SinkRecord { + + ConsumerRecord<byte[], byte[]> originalRecord; Review comment: When you move `InternalSinkRecord` to the `runtime` module, be sure to make this `private final`. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java ########## @@ -68,6 +70,14 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); } + public InternalSinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, + long kafkaOffset, Long timestamp, + TimestampType timestampType, Iterable<Header> headers, + ConsumerRecord<byte[], byte[]> originalRecord) { + return new InternalSinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, + kafkaOffset, timestamp, timestampType, headers, originalRecord); + } + Review comment: You can't change this public API. `InternalSinkRecord` needs to be in the `runtime` module. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[] timestamp, msg.timestampType(), headers); + + InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(), + origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(), + origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(), + origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg); + log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); if (isTopicTrackingEnabled) { - recordActiveTopic(origRecord.topic()); + recordActiveTopic(internalSinkRecord.topic()); } - return transformationChain.apply(origRecord); + return transformationChain.apply(internalSinkRecord); Review comment: This line would change to: ```suggestion // Apply the transformations SinkRecord transformedRecord = transformationChain.apply(sinkRecord); if (transformedRecord == null) { // The record is being dropped return null; } // Error reporting will need to correlate each sink record with the original consumer record return new InternalSinkRecord(msg, transformedRecord); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java ########## @@ -176,6 +182,43 @@ void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, Proce } } + public static KafkaProducer<byte[], byte[]> setUpTopicAndProducer( + Map<String, Object> adminProps, + Map<String, Object> producerProps, + SinkConnectorConfig sinkConnectorConfig, + int dlqTopicNumPartitions + ) { + String dlqTopic = sinkConnectorConfig.dlqTopicName(); + + if (dlqTopic != null && !dlqTopic.isEmpty()) { + try (Admin admin = Admin.create(adminProps)) { + if (!admin.listTopics().names().get().contains(dlqTopic)) { + log.error("Topic {} doesn't exist. Will attempt to create topic.", dlqTopic); + NewTopic schemaTopicRequest = new NewTopic( + dlqTopic, + dlqTopicNumPartitions, + sinkConnectorConfig.dlqTopicReplicationFactor() + ); + admin.createTopics(singleton(schemaTopicRequest)).all().get(); + } + } catch (InterruptedException e) { + throw new ConnectException( + "Could not initialize errant record reporter with topic = " + dlqTopic, + e + ); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof TopicExistsException)) { + throw new ConnectException( + "Could not initialize errant record reporter with topic = " + dlqTopic, + e + ); + } + } + return new KafkaProducer<>(producerProps); + } + return null; + } + Review comment: I think we don't really need to make this change anymore, since it's only refactoring the existing code that we don't need to actually change anymore. (This PR is no longer using this logic in multiple places.) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java ########## @@ -139,6 +145,18 @@ public void report() { } } + public Future<Void> report(Callback callback) { + List<Future<RecordMetadata>> futures = new ArrayList<>(); + for (ErrorReporter reporter: reporters) { + Future<RecordMetadata> future = reporter.report(this, callback); + if (!future.isDone()) { + futures.add(future); + } + } + return new ErrantRecordFuture(futures); Review comment: Since we often have just one reporter, it is probably worth avoiding the unnecessary allocations: ```suggestion if (reporters.size() == 1) { return reporters.get(0).report(this); } List<Future<RecordMetadata>> futures = new LinkedList<>(); for (ErrorReporter reporter: reporters) { Future<RecordMetadata> future = reporter.report(this, callback); if (!future.isDone()) { futures.add(future); } } if (futures.isEmpty()) { return CompletableFuture.completedFuture(null); } return new ErrantRecordFuture(futures); ``` And since we don't know how many futures we'll add to the list (and it will likely be just zero if the DLQ is not configured or just one for the DLQ), let's use a `LinkedList` instead to avoid excessive allocation when adding the first element to the `ArrayList`. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord; + +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class WorkerErrantRecordReporter implements ErrantRecordReporter { + + private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class); + + private final Callback callback = (metadata, exception) -> { + if (exception != null) { + throw new ConnectException("Failed to send the errant record to Kafka", + exception.getCause()); + } + }; + + private RetryWithToleranceOperator retryWithToleranceOperator; + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + // Visible for testing + public LinkedList<Future<Void>> futures; + + public WorkerErrantRecordReporter( + RetryWithToleranceOperator retryWithToleranceOperator, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter + ) { + this.retryWithToleranceOperator = retryWithToleranceOperator; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.headerConverter = headerConverter; + this.futures = new LinkedList<>(); + } + + @Override + public Future<Void> report(SinkRecord record, Throwable error) { + Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function; + + if (record instanceof InternalSinkRecord) { + function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord(); + } else { + function = sinkRecord -> { + + String topic = record.topic(); + byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key()); + byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(), + record.value()); + + RecordHeaders headers = new RecordHeaders(); + if (record.headers() != null) { + for (Header header : record.headers()) { + String headerKey = header.key(); + byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey, + header.schema(), header.value()); + headers.add(headerKey, rawHeader); + } + } + + return new ConsumerRecord<>(record.topic(), record.kafkaPartition(), + record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1, + -1, key, value, headers); + + }; + } + + Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT, + SinkTask.class, record, error, callback); + + futures.add(future); + return future; + } + + /** + * Gets all futures returned by the sink records sent to Kafka by the errant + * record reporter. This function is intended to be used to block on all the errant record + * futures. + */ + public void getAllFutures() { + for (Future<Void> future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Encountered an error while calling "); + throw new ConnectException(e); + } + } + futures.clear(); + } + + /** + * Wrapper class to aggregate producer futures and abstract away the record metadata from the + * Connect user. + */ + public static class ErrantRecordFuture implements Future<Void> { + + private final List<Future<RecordMetadata>> futures; + Review comment: Let's avoid unnecessary blank lines. ```suggestion ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/ErrantRecordReporter.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.sink; + +import java.util.concurrent.Future; + +/** + * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}. + * Reporter of problematic records and the corresponding problems. + * + * @since 2.6 + */ +public interface ErrantRecordReporter { + + /** + * Report a problematic record and the corresponding error to be written to the sink + * connector's dead letter queue (DLQ). + * + * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}. + * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the + * record has been written or throw any exception that occurred while sending the record. + * If you want to simulate a simple blocking call you can call the <code>get()</code> method + * immediately. + * + * Connect guarantees that sink records reported through this reporter will be written to the error topic + * before the framework calls the {@link SinkTask#preCommit(Map)} method and therefore before + * committing the consumer offsets. SinkTask implementations can use the Future when stronger guarantees + * are required. + * + * @param record the problematic record; may not be null + * @param error the error capturing the problem with the record; may not be null + * @return a future that can be used to block until the record and error are reported + * to the DLQ + * @throws ConnectException if the error reporter and DLQ fails to write a reported record Review comment: You need to qualify `ConnectException` or import it. The latter is probably better in this case to make the JavaDoc more readable in the code. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[] timestamp, msg.timestampType(), headers); + + InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(), + origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(), + origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(), + origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg); + Review comment: Let's not create the `InternalSinkRecord` until *after* the transformation chain has been applied. That way we're not affected by any SMT that creates a new `SinkRecord` via a constructor (where we'd lose our `InternalSinkRecord`) rather than `newRecord(...)` (where we'd keep the `InternalSinkRecord`). ```suggestion ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java ########## @@ -100,4 +110,29 @@ public String toString() { ", timestampType=" + timestampType + "} " + super.toString(); } + + public class InternalSinkRecord extends SinkRecord { + + ConsumerRecord<byte[], byte[]> originalRecord; + + public InternalSinkRecord(String topic, int partition, Schema keySchema, Object key, + Schema valueSchema, Object value, long kafkaOffset, + Long timestamp, TimestampType timestampType, + Iterable<Header> headers, + ConsumerRecord<byte[], byte[]> originalRecord) { + super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, + timestampType, headers); + this.originalRecord = originalRecord; + Review comment: First of all, let's avoid adding unnecessary blank lines. Second, when you move this to `runtime`, you won't need to use this constructor and instead could use a much more straightforward one: ``` public InternalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, SinkRecord record) { super(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.kafkaOffset(), record.timestamp(), record.timestampType(), record.headers()); this.originalRecord = Objects.requireNonNull(originalRecord); } ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java ########## @@ -95,4 +95,30 @@ */ void requestCommit(); + /** + * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records} + * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record, Review comment: This is invalid: ```suggestion * passed to the {@link SinkTask#put(java.util.Collection)} method. When reporting a failed record, ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord; + +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class WorkerErrantRecordReporter implements ErrantRecordReporter { + + private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class); + + private final Callback callback = (metadata, exception) -> { + if (exception != null) { + throw new ConnectException("Failed to send the errant record to Kafka", + exception.getCause()); + } + }; + + private RetryWithToleranceOperator retryWithToleranceOperator; + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + // Visible for testing + public LinkedList<Future<Void>> futures; + + public WorkerErrantRecordReporter( + RetryWithToleranceOperator retryWithToleranceOperator, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter + ) { + this.retryWithToleranceOperator = retryWithToleranceOperator; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.headerConverter = headerConverter; + this.futures = new LinkedList<>(); + } + + @Override + public Future<Void> report(SinkRecord record, Throwable error) { + Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function; Review comment: Why use a function here? We can use a simple variable here. (I suggested a function offline to avoid having to pass in the converters. But passing in the converters into this class encapsulates this logic nicely.) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java ########## @@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. */ public void report(ProcessingContext context) { - final String dlqTopicName = connConfig.dlqTopicName(); + Callback callback = (metadata, exception) -> { + if (exception != null) { + log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception); + errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); + } + }; + report(context, callback); + } + + /** + * Write the raw records into a Kafka topic. This methods allows for a custom callback to be + * passed to the producer. + * + * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param callback callback to be invoked by the producer when the record is sent to Kafka. + * @return Review comment: Missing JavaDoc details: ```suggestion * @return the future associated with the writing of this record; never null ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java ########## @@ -100,4 +110,29 @@ public String toString() { ", timestampType=" + timestampType + "} " + super.toString(); } + + public class InternalSinkRecord extends SinkRecord { + + ConsumerRecord<byte[], byte[]> originalRecord; + + public InternalSinkRecord(String topic, int partition, Schema keySchema, Object key, + Schema valueSchema, Object value, long kafkaOffset, + Long timestamp, TimestampType timestampType, + Iterable<Header> headers, + ConsumerRecord<byte[], byte[]> originalRecord) { + super(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, + timestampType, headers); + this.originalRecord = originalRecord; + + } + + /** + * + * @return the original consumer record that was converted to this sink record. + */ Review comment: If we're going to add JavaDoc, which I think is helpful, then make it complete by adding a description: ```suggestion /** * Return the original consumer record that this sink record represents. * * @return the original consumer record; never null */ ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -552,14 +553,18 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings()); - retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, + errorHandlingMetrics, connectorClass)); Review comment: I don't think this line was actually changed other than formatting. Please remove it to avoid changing lines we don't have to. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java ########## @@ -95,4 +95,30 @@ */ void requestCommit(); + /** + * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord records} + * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record, + * the sink task will receive a {@link Future} that the task can optionally use to wait until Review comment: This is invalid in JavaDoc: ```suggestion * the sink task will receive a {@link java.util.concurrent.Future} that the task can optionally use to wait until ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -497,12 +506,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[] timestamp, msg.timestampType(), headers); + + InternalSinkRecord internalSinkRecord = origRecord.newRecord(origRecord.topic(), + origRecord.kafkaPartition(), origRecord.keySchema(), origRecord.key(), + origRecord.valueSchema(), origRecord.value(), origRecord.kafkaOffset(), + origRecord.timestamp(), origRecord.timestampType(), origRecord.headers(), msg); + log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); if (isTopicTrackingEnabled) { - recordActiveTopic(origRecord.topic()); + recordActiveTopic(internalSinkRecord.topic()); Review comment: This line would not need to be affected. ```suggestion recordActiveTopic(sinkRecord.topic()); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java ########## @@ -67,30 +72,22 @@ private final SinkConnectorConfig connConfig; private final ConnectorTaskId connectorTaskId; private final ErrorHandlingMetrics errorHandlingMetrics; + private final String dlqTopicName; private KafkaProducer<byte[], byte[]> kafkaProducer; public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminProps, ConnectorTaskId id, SinkConnectorConfig sinkConfig, Map<String, Object> producerProps, ErrorHandlingMetrics errorHandlingMetrics) { - String topic = sinkConfig.dlqTopicName(); - try (Admin admin = Admin.create(adminProps)) { - if (!admin.listTopics().names().get().contains(topic)) { - log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); - NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor()); - admin.createTopics(singleton(schemaTopicRequest)).all().get(); - } - } catch (InterruptedException e) { - throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof TopicExistsException)) { - throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e); - } - } - KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); + KafkaProducer<byte[], byte[]> dlqProducer = setUpTopicAndProducer( + adminProps, + producerProps, + sinkConfig, + DLQ_NUM_DESIRED_PARTITIONS + ); Review comment: We don't need to make this change, do we? Let's try to minimize the changes to the existing code. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java ########## @@ -83,6 +87,17 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi this.time = time; } + public Future<Void> executeFailed(Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function, + Stage stage, Class<?> executingClass, SinkRecord record, + Throwable error, Callback callback) { + Review comment: No new line is needed here. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java ########## @@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. */ public void report(ProcessingContext context) { - final String dlqTopicName = connConfig.dlqTopicName(); + Callback callback = (metadata, exception) -> { + if (exception != null) { + log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception); + errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); + } + }; + report(context, callback); + } + + /** + * Write the raw records into a Kafka topic. This methods allows for a custom callback to be + * passed to the producer. + * + * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param callback callback to be invoked by the producer when the record is sent to Kafka. + * @return + */ + public Future<RecordMetadata> report(ProcessingContext context, Callback callback) { if (dlqTopicName.isEmpty()) { - return; + return CompletableFuture.completedFuture(null); } - errorHandlingMetrics.recordDeadLetterQueueProduceRequest(); ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord(); if (originalMessage == null) { errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); - return; + return CompletableFuture.completedFuture(null); } - ProducerRecord<byte[], byte[]> producerRecord; - if (originalMessage.timestamp() == RecordBatch.NO_TIMESTAMP) { - producerRecord = new ProducerRecord<>(dlqTopicName, null, - originalMessage.key(), originalMessage.value(), originalMessage.headers()); - } else { - producerRecord = new ProducerRecord<>(dlqTopicName, null, originalMessage.timestamp(), - originalMessage.key(), originalMessage.value(), originalMessage.headers()); - } + ProducerRecord<byte[], byte[]> producerRecord = + new ProducerRecord<>(dlqTopicName, null, + originalMessage.timestamp() != RecordBatch.NO_TIMESTAMP ? + originalMessage.timestamp() : null, + originalMessage.key(), originalMessage.value(), originalMessage.headers()); Review comment: It'd be better to not change these lines, because we don't intend to change the logic -- yet doing so adds risk and increases the size of this PR. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java ########## @@ -119,38 +117,46 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. */ public void report(ProcessingContext context) { - final String dlqTopicName = connConfig.dlqTopicName(); + Callback callback = (metadata, exception) -> { + if (exception != null) { + log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception); + errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); + } + }; + report(context, callback); + } + + /** + * Write the raw records into a Kafka topic. This methods allows for a custom callback to be + * passed to the producer. + * + * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param callback callback to be invoked by the producer when the record is sent to Kafka. + * @return Review comment: Why do we need to overload this method to pass in a callback? The only place we're using this new method is via the reporter, and `WorkerErrantRecordReporter.callback` doesn't seem to provide any value and in fact is not able to call `errorHandlingMetrics.recordDeadLetterQueueProduceFailed()` like this class. Wouldn't it be much simpler to just add a return type to the existing method? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorReporter.java ########## @@ -28,6 +34,18 @@ */ void report(ProcessingContext context); + /** + * Report an error with a specified callback. + * + * @param context the processing context (cannot be null). + * @param callback callback to be invoked by a producer when sending a record to Kafka. + * @return future result from the producer sending a record to Kafka + */ + default Future<RecordMetadata> report(ProcessingContext context, Callback callback) { + report(context); + return CompletableFuture.completedFuture(null); + } + Review comment: I don't think we need to overload this method, and instead we can just change the return type. After all, the `ErrorReporter` is not part of the public API, and is merely an abstraction we use within the runtime itself. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord; + +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class WorkerErrantRecordReporter implements ErrantRecordReporter { + + private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class); + + private final Callback callback = (metadata, exception) -> { + if (exception != null) { + throw new ConnectException("Failed to send the errant record to Kafka", + exception.getCause()); + } + }; + + private RetryWithToleranceOperator retryWithToleranceOperator; + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + // Visible for testing + public LinkedList<Future<Void>> futures; + + public WorkerErrantRecordReporter( + RetryWithToleranceOperator retryWithToleranceOperator, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter + ) { + this.retryWithToleranceOperator = retryWithToleranceOperator; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.headerConverter = headerConverter; + this.futures = new LinkedList<>(); + } + + @Override + public Future<Void> report(SinkRecord record, Throwable error) { + Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function; + + if (record instanceof InternalSinkRecord) { + function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord(); + } else { + function = sinkRecord -> { + + String topic = record.topic(); + byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key()); + byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(), + record.value()); + + RecordHeaders headers = new RecordHeaders(); + if (record.headers() != null) { + for (Header header : record.headers()) { + String headerKey = header.key(); + byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey, + header.schema(), header.value()); + headers.add(headerKey, rawHeader); + } + } + + return new ConsumerRecord<>(record.topic(), record.kafkaPartition(), + record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1, + -1, key, value, headers); + + }; + } + + Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT, + SinkTask.class, record, error, callback); + + futures.add(future); Review comment: How about adding it only if the future is not done already? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.errors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkRecord.InternalSinkRecord; + +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class WorkerErrantRecordReporter implements ErrantRecordReporter { + + private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class); + + private final Callback callback = (metadata, exception) -> { + if (exception != null) { + throw new ConnectException("Failed to send the errant record to Kafka", + exception.getCause()); + } + }; + + private RetryWithToleranceOperator retryWithToleranceOperator; + private Converter keyConverter; + private Converter valueConverter; + private HeaderConverter headerConverter; + + // Visible for testing + public LinkedList<Future<Void>> futures; + + public WorkerErrantRecordReporter( + RetryWithToleranceOperator retryWithToleranceOperator, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter + ) { + this.retryWithToleranceOperator = retryWithToleranceOperator; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + this.headerConverter = headerConverter; + this.futures = new LinkedList<>(); + } + + @Override + public Future<Void> report(SinkRecord record, Throwable error) { + Function<SinkRecord, ConsumerRecord<byte[], byte[]>> function; + + if (record instanceof InternalSinkRecord) { + function = sinkRecord -> ((InternalSinkRecord) sinkRecord).originalRecord(); + } else { + function = sinkRecord -> { + + String topic = record.topic(); + byte[] key = keyConverter.fromConnectData(topic, record.keySchema(), record.key()); + byte[] value = valueConverter.fromConnectData(topic, record.valueSchema(), + record.value()); + + RecordHeaders headers = new RecordHeaders(); + if (record.headers() != null) { + for (Header header : record.headers()) { + String headerKey = header.key(); + byte[] rawHeader = headerConverter.fromConnectHeader(topic, headerKey, + header.schema(), header.value()); + headers.add(headerKey, rawHeader); + } + } + + return new ConsumerRecord<>(record.topic(), record.kafkaPartition(), + record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, -1, + -1, key, value, headers); + + }; + } + + Future<Void> future = retryWithToleranceOperator.executeFailed(function, Stage.TASK_PUT, + SinkTask.class, record, error, callback); + + futures.add(future); + return future; + } + + /** + * Gets all futures returned by the sink records sent to Kafka by the errant + * record reporter. This function is intended to be used to block on all the errant record + * futures. + */ + public void getAllFutures() { + for (Future<Void> future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Encountered an error while calling "); + throw new ConnectException(e); + } + } + futures.clear(); + } + + /** + * Wrapper class to aggregate producer futures and abstract away the record metadata from the + * Connect user. + */ + public static class ErrantRecordFuture implements Future<Void> { + + private final List<Future<RecordMetadata>> futures; + + + public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) { + futures = producerFutures; + } Review comment: Rather than have a list of futures, why not have a single `Future` delegate that is either a `CompletableFuture.allOf(...)` or a single feature? This makes the constructor a little more complex, but it would simplify all of the other methods tremendously since they merely have to delegate (except for `cancel()` and `isCancelled()`, which can stay the same: ```suggestion public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) { if (producerFutures == null || producerFutures.isEmpty()) { future = CompletableFuture.completedFuture(null); } else { futures = CompletableFutures.allOf(producerFutures); } } ``` This will make `get(long, TimeUnit)` behave more correctly by requiring that all futures complete within the stated time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org