rhauch commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r737931069
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been + * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + + private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + + // Visible for testing + final Map<Map<String, Object>, Deque<SubmittedRecord>> records; Review comment: @C0urante, you mention above a case where using multiple deques is clearly superior: > If a subset of the topic partitions that a task is writing to is unavailable for a period of time, then writes to those topic partitions will start failing (and being infinitely retried, if the [default Connect producer properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654) are used). Writes to other topic partition will still succeed, and (assuming the task does not perform some fairly-sophisticated filtering of how it polls from the source system based on invocations of `SourceTask::commitRecord`) the task will continue producing data destined for all topic partitions with no knowledge that writes to that affected subset are not yet succeeding. This is a fair point. I think the essential factor here is the ratio of records destined for the fast vs slow topic partitions. If that ratio is even moderately high, with a high throughput connector there still will likely be enough records destined for the slow topic partitions that the producer buffer (even if set high) will eventually fill up, causing the whole connector task to be throttled. Yes, there might be a lot of records, but (depending upon the ratio) there is a limit. What happens when this ratio is skewed to a very high value, such that nearly all of records generated by the connector are destined to fast topic partitions. In that case, there might be such a small number of records destined to the slow topic partitions that the producer's buffer might never fill up. This condition could last for days. With a single deque, the worker would slowly make progress committing offsets, just in a bursty fashion. That could still leave hours and hours of offsets not committed for records destined for the fast topic partitions. I think the mistake I made before was assuming the latter case was more of an edge case, when it seems like it might not be that unusual. For example, consider a database source connector capturing rows from two tables, A and B. The producer happens to write records to topic A (from table A) very quickly, but struggles to write records to topic B (from table B). The ratio mentioned above might simply be the number of rows (or changes in rows) in each table. One could easily imagine table A having many millions of new/updated rows per day, while table B only has a handful of new/updated rows per day. The single deque approach would solve the core problem in many/most cases, but would fail to record progress via source offsets in some arguably common cases. In my mind, this changes the balance point. The multiple deque solution as proposed in this PR would work regardless of the ratio, whereas the single deque approach would not. And IMO, fixing the problem is most if not all cases outweighs the potential of a connector that might be affected by the change in order that source offsets (with different source partitions) might be committed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org