C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r737632973



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the 
record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       Thank you both for your thoughts.
   
   I think there are a few things at play here that are worth discussing:
   
   1. Whether this change should be backported.
   1. Whether preserving existing behavior should be prioritized.
   1. What the potential benefits of the current multi-deque approach are.
   1. What the conditions for re-evaluating a multi-deque approach are.
   
   ### Backporting
   Even before the single- vs. multi-deque comments were made, I wasn't sure 
that this change would be suitable for a backport. It's fairly aggressive and 
comes with moderate risk, and blurs the line between a bug fix and an 
improvement. Is there a case where this change would "fix" an otherwise 
irrevocably-broken connector? If a task's producer is overwhelmed by the 
throughput of messages provided from the task's `poll` method, for example, 
this change will not fix the underlying issue; it will only reduce the number 
of duplicates produced by the task if/when it is restarted after being 
reconfigured to better handle the throughput of data it is processing.
   
   ### Existing behavior
   The Connect API and documentation makes no guarantee about blocking offset 
commits for source partitions. It makes no guarantee about the order in which 
[`SourceTask::commitRecord`](https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata))
 is invoked (and, in fact, the current behavior for that method more closely 
mirrors the equivalent source offset commit behavior provided by a multi-deque 
approach: commits of source records targeting unrelated partitions do not block 
each other).
   
   Additionally, I cannot think of a development style, external system, or 
other kind of use case that would rely on the order of source offset commits 
(across different source partitions) matching the order of the source 
partition/offset pairs as they were provided from the task in 
`SourceTask::poll`.
   
   Essentially, this behavior change violates no contract of the Connect API 
and, at the moment, there is no known or even reasonable case of a connector 
that would be affected by it. If one can be provided (even using the most 
abstract kind of external system or developer mindset), then I do agree that 
it's worth it to preserve existing behavior, but at the moment it seems the 
risk here is being overstated.
   
   ### Benefits of multiple deques
   If a subset of the topic partitions that a task is writing to is unavailable 
for a period of time, then writes to those topic partitions will start failing 
(and being infinitely retried, if the [default Connect producer 
properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654)
 are used). Writes to other topic partition will still succeed, and (assuming 
the task does not perform some fairly-sophisticated filtering of how it polls 
from the source system based on invocations of `SourceTask::commitRecord`) the 
task will continue producing data destined for all topic partitions with no 
knowledge that writes to that affected subset are not yet succeeding.
   
   If the task is gracefully restarted (i.e., has time to shut down, release 
resources, and perform an end-of-life offset commit) during this period, the 
differences between the single-deque and multi-deque approach can be 
potentially enormous, depending on how long the topic partitions were 
unavailable for and whether there is a correlation between the source 
partitions for the records the task produces and the physical Kafka topic 
partitions that those records are sent to. One known case where there is a 
direct correlation is MirrorMaker 2.
   
   A slightly weaker, but still valid case, is that if the task's producer has 
issues producing to a subset of topic partitions, the same potential difference 
arises. Of course in this case it's best to reconfigure the task (or its 
producer), or perhaps even open a PR to fix a bug or make an improvement to the 
producer logic. But in the meantime, we should aim to make Kafka Connect as 
resilient as possible in the face of these kinds of degradation or even failure 
scenarios.
   
   ### Conditions for re-evaluation
   There are already known cases where the multi-deque approach would be 
beneficial. Given this, the only serious reason to refrain from implementing it 
is to preserve behavior for connectors that, I believe, likely do not exist. 
However, it's impossible to prove that particular negative, so it's worth 
considering: what would have to change between now and some point in the future 
for the value that's being placed on these hypothetical connectors to diminish?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to