C0urante commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r737632973
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been + * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + + private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + + // Visible for testing + final Map<Map<String, Object>, Deque<SubmittedRecord>> records; Review comment: Thank you both for your thoughts. I think there are a few things at play here that are worth discussing: 1. Whether this change should be backported. 1. Whether preserving existing behavior should be prioritized. 1. What the potential benefits of the current multi-deque approach are. 1. What the conditions for re-evaluating a multi-deque approach are. ### Backporting Even before the single- vs. multi-deque comments were made, I wasn't sure that this change would be suitable for a backport. It's fairly aggressive and comes with moderate risk, and blurs the line between a bug fix and an improvement. Is there a case where this change would "fix" an otherwise irrevocably-broken connector? If a task's producer is overwhelmed by the throughput of messages provided from the task's `poll` method, for example, this change will not fix the underlying issue; it will only reduce the number of duplicates produced by the task if/when it is restarted after being reconfigured to better handle the throughput of data it is processing. ### Existing behavior The Connect API and documentation makes no guarantee about blocking offset commits for source partitions. It makes no guarantee about the order in which [`SourceTask::commitRecord`](https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)) is invoked (and, in fact, the current behavior for that method more closely mirrors the equivalent source offset commit behavior provided by a multi-deque approach: commits of source records targeting unrelated partitions do not block each other). Additionally, I cannot think of a development style, external system, or other kind of use case that would rely on the order of source offset commits (across different source partitions) matching the order of the source partition/offset pairs as they were provided from the task in `SourceTask::poll`. Essentially, this behavior change violates no contract of the Connect API and, at the moment, there is no known or even reasonable case of a connector that would be affected by it. If one can be provided (even using the most abstract kind of external system or developer mindset), then I do agree that it's worth it to preserve existing behavior, but at the moment it seems the risk here is being overstated. One other consideration that should be made is that the existing metrics on offset commits, which currently serve as an indicator of a task's producer's ability to keep up with the throughput of records produced by the task, will no longer be useful on that front. If a batch could not be flushed in time for an offset commit, the offset commit would be marked failed, and alerts for those offset commits could be triggered to indicate that the task is unhealthy despite not being in the `FAILED` status. This is a much more realistic scenario (that I personally have witnessed in production); should we respect this use case as well and simply abandon the current effort to optimize offset commits altogether, or at least choose not to backport it for this reason? ### Benefits of multiple deques If a subset of the topic partitions that a task is writing to is unavailable for a period of time, then writes to those topic partitions will start failing (and being infinitely retried, if the [default Connect producer properties](https://github.com/apache/kafka/blob/195ebff25b637bf09ce3283e204cfb1faffe34fc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L649-L654) are used). Writes to other topic partition will still succeed, and (assuming the task does not perform some fairly-sophisticated filtering of how it polls from the source system based on invocations of `SourceTask::commitRecord`) the task will continue producing data destined for all topic partitions with no knowledge that writes to that affected subset are not yet succeeding. If the task is gracefully restarted (i.e., has time to shut down, release resources, and perform an end-of-life offset commit) during this period, the differences between the single-deque and multi-deque approach can be potentially enormous, depending on how long the topic partitions were unavailable for and whether there is a correlation between the source partitions for the records the task produces and the physical Kafka topic partitions that those records are sent to. One known case where there is a direct correlation is MirrorMaker 2. A slightly weaker, but still valid case, is that if the task's producer has issues producing to a subset of topic partitions, the same potential difference arises. Of course in this case it's best to reconfigure the task (or its producer), or perhaps even open a PR to fix a bug or make an improvement to the producer logic. But in the meantime, we should aim to make Kafka Connect as resilient as possible in the face of these kinds of degradation or even failure scenarios. ### Conditions for re-evaluation There are already known cases where the multi-deque approach would be beneficial. Given this, the only serious reason to refrain from implementing it is to preserve behavior for connectors that, I believe, likely do not exist. However, it's impossible to prove that particular negative, so it's worth considering: what would have to change between now and some point in the future for the value that's being placed on these hypothetical connectors to diminish? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org