C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914348989


   @rhauch Overall that looks good to me. It's an elegant solution to the 
tricky problem you noted about the opacity of task-provided source offsets 
w/r/t ordering.
   
   I'm a little worried about offset commits taking longer and longer with the 
more sophisticated approach you proposed (where we would unconditionally 
iterate over every record in the batch, instead of only until the first 
unacknowledged record). It's true that there would be natural back pressure 
from the producer as its `buffer.memory` gets eaten up, but with the default of 
32MB, it still seems possible for a large number of unacknowledged records to 
build up. If this does happen, then offset commits may end up exceeding the 
`offset.flush.timeout.ms` for the worker, which may cause issues with the 
current model where a single shared, worker-global thread is used for offset 
commits of all tasks.
   
   If this is a valid concern and we'd like to take it into account for now, I 
can think of a couple ways to handle it off the top of my head:
   1. Use the simpler approach that blocks offset commits across the board if a 
single record remains unacknowledged for a long period of time (which may 
realistically be a problem if a single partition out of many is unavailable for 
some reason).
   2. Enable concurrent offset commits by multiple tasks.
   3. Instead of a single dequeue per task, use a `ConcurrentMap<Map<String, 
?>, Queue<SubmittedRecord>>` that stores a single dequeue per unique source 
partition. This would allow us to iterate over the bare minimum number of 
records for every single offset commit and not spend time, for example, on 
accumulated records for unavailable Kafka partitions. We'd still have to 
iterate over those records eventually if the Kafka partition came back online, 
but that iteration would only ever occur once, instead of once for every offset 
commit.
   
   I think option 3 may be warranted, although it's still possible that offset 
commits take a long time if 32MB worth of records end up getting queued. Option 
2 may be worth implementing or at least considering as a follow-up item to 
handle this case.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to