artemlivshits commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866386035
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1403,25 +1452,54 @@ public boolean isDone() {
}
/**
- * A callback called when producer request is complete. It in turn calls
user-supplied callback (if given) and
- * notifies producer interceptors about the request completion.
+ * Callbacks that are called by the RecordAccumulator append functions:
+ * - user callback
+ * - interceptor callbacks
+ * - partition callback
*/
- private static class InterceptorCallback<K, V> implements Callback {
+ private class AppendCallbacks<K, V> implements
RecordAccumulator.AppendCallbacks {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
- private final TopicPartition tp;
+ private final ProducerRecord<K, V> record;
+ protected int partition = RecordMetadata.UNKNOWN_PARTITION;
- private InterceptorCallback(Callback userCallback,
ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
+ private AppendCallbacks(Callback userCallback, ProducerInterceptors<K,
V> interceptors, ProducerRecord<K, V> record) {
this.userCallback = userCallback;
this.interceptors = interceptors;
- this.tp = tp;
+ this.record = record;
}
+ @Override
public void onCompletion(RecordMetadata metadata, Exception exception)
{
- metadata = metadata != null ? metadata : new RecordMetadata(tp,
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+ if (metadata == null) {
+ metadata = new RecordMetadata(topicPartition(), -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1);
+ }
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
}
+
+ @Override
+ public void setPartition(int partition) {
+ assert partition != RecordMetadata.UNKNOWN_PARTITION;
+ this.partition = partition;
+
+ if (log.isTraceEnabled()) {
+ // Log the message here, because we don't know the partition
before that.
+ log.trace("Attempting to append record {} with callback {} to
topic {} partition {}", record, userCallback, record.topic(), partition);
+ }
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public TopicPartition topicPartition() {
+ if (record == null)
Review Comment:
I've added it because there is a test that passes a null and expects some
error handling. From that perspective, adding an assert would contradict the
expectation that there is defined and tested error handling (in my mind, an
assert means the behavior is undefined and all bets are off).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]