rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r737931069



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the 
record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       @C0urante, you mention above a case where using multiple deques is 
clearly superior:
   
   > If a subset of the topic partitions that a task is writing to is 
unavailable for a period of time, then writes to those topic partitions will 
start failing (and being infinitely retried, if the [default Connect producer 
properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654)
 are used). Writes to other topic partition will still succeed, and (assuming 
the task does not perform some fairly-sophisticated filtering of how it polls 
from the source system based on invocations of `SourceTask::commitRecord`) the 
task will continue producing data destined for all topic partitions with no 
knowledge that writes to that affected subset are not yet succeeding.
   
   This is a fair point. I think the essential factor here is the ratio of 
records destined for the fast vs slow topic partitions. If that ratio is even 
moderately high, with a high throughput connector there still will likely be 
enough records destined for the slow topic partitions that the producer buffer 
(even if set high) will eventually fill up, causing the whole connector task to 
be throttled. Yes, there might be a lot of records, but (depending upon the 
ratio) there is a limit.
   
   What happens when this ratio is skewed to a very high value, such that 
nearly all of records generated by the connector are destined to fast topic 
partitions. In that case, there might be such a small number of records 
destined to the slow topic partitions that the producer's buffer might never 
fill up. This condition could last for days. With a single deque, the worker 
would slowly make progress committing offsets, just in a bursty fashion. That 
could still leave hours and hours of offsets not committed for records destined 
for the fast topic partitions.
   
   I think the mistake I made before was assuming the latter case was more of 
an edge case, when it seems like it might not be that unusual. For example, 
consider a database source connector capturing rows from two tables, A and B. 
The producer happens to write records to topic A (from table A) very quickly, 
but struggles to write records to topic B (from table B). The ratio mentioned 
above might simply be the number of rows (or changes in rows) in each table. 
One could easily imagine table A having many millions of new/updated rows per 
day, while table B only has a handful of new/updated rows per day.
   
   The single deque approach would solve the core problem in many/most cases, 
but would fail to record progress via source offsets in some arguably common 
cases.
   
   In my mind, this changes the balance point. The multiple deque solution as 
proposed in this PR would work regardless of the ratio, whereas the single 
deque approach would not. And IMO, fixing the problem is most if not all cases 
outweighs the potential of a connector that might be affected by the change in 
order that source offsets (with different source partitions) might be committed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to