C0urante commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-915350922
> I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation. Agreed 👍 > IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore. That's mostly correct--we wouldn't be waiting on a blocking operation while iterating through the dequeue(s), although we might still choose to block on the actual write to the offset topic in the [same way that we currently](https://github.com/apache/kafka/blob/fb77da941ac2a34513cf2cd5d11137ba9b275575/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L565-L586) do just for the sake of metrics and allowing users to monitor the health of the connection between the Connect worker and the offsets topic. Not a huge deal though, and the point that we wouldn't be blocking on the task's producer is still valid. I think the issue is less that we'd end up timing out and more that we'd end up violating the guarantee that's provided right now by the framework that each task gets to take up only `offset.flush.timeout.ms` milliseconds per offset commit attempt before aborting the attempt and yielding control to the next task. A dequeue-based approach may actually be worse than the current behavior in that regard if there's no check in place to ensure that iterating over the dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, but we can probably satisfy both objectives with your suggestion: > another option might be to incur the iteration on the worker source task thread. I think this'd be great, especially with the snapshotting logic you mention, which should basically eliminate any blocking between the two threads except to prevent race conditions while simple operations like clearing a hash map or assigning a new value to an instance variable take place. One thing that gave me pause initially was the realization that we'd be double-iterating over every source record at this point: once to transform, convert, and dispatch the record to the producer, and then once to verify that it had been acknowledged while iterating over the dequeue it's in. But I can't imagine it'd make a serious difference with CPU utilization given that transformation, conversion, and dispatching to a producer are likely to be at least an order of magnitude more expensive than just checking a boolean flag and possibly inserting the record's offset into a hash map. And memory utilization should be very close to the existing approach, which already tracks every single unacknowledged record in the `outstandingMessages` and `outstandingMessagesBacklog` fields. I think this buys us enough that my earlier-mentioned option 2 (multiple threads for offset commits) isn't called for, since the only blocking operation that would be performed during offset commit at this point is a write to the offsets topic. If the offsets topic is unavailable, it's likely that the impact would be the same across all tasks (unless the task is using a separate offsets topic, which will become possible once the changes for KIP-618 are merged), and even if not, things wouldn't be made any worse than they already are: the offset flush timeout would expire, and the next task in line would get its chance to commit offsets. @rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org