C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-915350922


   > I also think that the behavior with the suggested approach and your option 
3 is still a lot better than the current situation.
   
   Agreed 👍 
   
   > IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as 
there actually are no timeouts as the offset commit thread doesn't block 
anymore.
   
   That's mostly correct--we wouldn't be waiting on a blocking operation while 
iterating through the dequeue(s), although we might still choose to block on 
the actual write to the offset topic in the [same way that we 
currently](https://github.com/apache/kafka/blob/fb77da941ac2a34513cf2cd5d11137ba9b275575/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L565-L586)
 do just for the sake of metrics and allowing users to monitor the health of 
the connection between the Connect worker and the offsets topic. Not a huge 
deal though, and the point that we wouldn't be blocking on the task's producer 
is still valid.
   
   I think the issue is less that we'd end up timing out and more that we'd end 
up violating the guarantee that's provided right now by the framework that each 
task gets to take up only `offset.flush.timeout.ms` milliseconds per offset 
commit attempt before aborting the attempt and yielding control to the next 
task. A dequeue-based approach may actually be worse than the current behavior 
in that regard if there's no check in place to ensure that iterating over the 
dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, 
but we can probably satisfy both objectives with your suggestion:
   
   > another option might be to incur the iteration on the worker source task 
thread.
   
   I think this'd be great, especially with the snapshotting logic you mention, 
which should basically eliminate any blocking between the two threads except to 
prevent race conditions while simple operations like clearing a hash map or 
assigning a new value to an instance variable take place.
   
   One thing that gave me pause initially was the realization that we'd be 
double-iterating over every source record at this point: once to transform, 
convert, and dispatch the record to the producer, and then once to verify that 
it had been acknowledged while iterating over the dequeue it's in. But I can't 
imagine it'd make a serious difference with CPU utilization given that 
transformation, conversion, and dispatching to a producer are likely to be at 
least an order of magnitude more expensive than just checking a boolean flag 
and possibly inserting the record's offset into a hash map. And memory 
utilization should be very close to the existing approach, which already tracks 
every single unacknowledged record in the `outstandingMessages` and 
`outstandingMessagesBacklog` fields.
   
   I think this buys us enough that my earlier-mentioned option 2 (multiple 
threads for offset commits) isn't called for, since the only blocking operation 
that would be performed during offset commit at this point is a write to the 
offsets topic. If the offsets topic is unavailable, it's likely that the impact 
would be the same across all tasks (unless the task is using a separate offsets 
topic, which will become possible once the changes for KIP-618 are merged), and 
even if not, things wouldn't be made any worse than they already are: the 
offset flush timeout would expire, and the next task in line would get its 
chance to commit offsets.
   
   @rhauch If this is all agreeable I think we're ready to start implementing. 
Since you've provided a lot of the code yourself I'm happy to let you take on 
that work if you'd like; otherwise, I'll get started and see if I can have a 
new PR with these changes out by early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to