lucasbru commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814363083
> @philipnee @kirktrue @lucasbru I am concerned that when one calls
`wakeup()` the application thread might stay blocked in waiting for a non-empty
fetch buffer.
Yes. The application thread remains blocked, so this doesn't really wake the
application thread up.
> Currently, I do not know how to interrupt the application thread with the
current code. I cannot do it in `AsyncKafkaConsumer#wakeup()` because I do not
have a handle on the application thread and I cannot do it in
`WakeupTrigger#maybeTriggerWakeup()` because there it is too late. Any ideas?
I'd say we signal `notEmptyCondition` in `FetchBuffer`. That could possibly
be achieved this way:
- We implement a new subclass of `Wakeupable` that is a
`FetchAction(FetchBuffer)`, similar to my idea above.
- In `AsynConsumer.poll`, `FetchAction` is registered before the poll and
removed after the poll.
- In `WakeupTrigger.wakeup()`, `FetchAction` is replaced by a
`WakeupFuture`, and `FetchBuffer.wakeup` is called.
- `FetchBuffer.wakeup` calls `notEmptyCondition.signalAll()`.
There are other ideas, but I think they are more problematic:
1. If we wanted to stick more to the original architecture, we could attempt
to solve "waiting for a non-empty fetch buffer" by putting a future in the
background queue and let the background thread poll loop complete the future.
But that would be a significant rewrite and may have other problems.
2. I'm implementing a change where only one thread can use the consumer API,
just like in the original consumer. In principle, you could store a reference
to the application thread in the consumer when it is opened and interrupt it
that way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]