Hey Alexey,

The API of the new consumer is designed around an event loop in which all
IO is driven by the poll() API. To make this work, you need to call poll()
in a loop (see the javadocs for examples). So in this example, when you
call commitAsync(), the request is basically just queued up to be sent. It
won't actually be transmitted, however, until the next poll() is called.
And it may not return until an even later call.

If I understand correctly, the basic problem you're trying to figure out is
how you can do an asynchronous commit without continuing to receive records
for the associated partitions. One option would be to use the pause() API
to suspend fetching from those partitions. So you could call commitAsync()
with the commits you need to send, then pause the respective partitions.
After the commit returns, you could resume() the partitions in the commit
callback, which will reenable fetching.

Hope that helps!

-Jason



On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk <alexey.romanc...@gmail.com
> wrote:

> Hi all!
>
> Right now I am working on reactive streams connector to kafka. I am using
> new client and found strange behavior of commitAsync method which not
> calling callbacks at all at some cases.
>
> I found, that callback calling is a part of handling of incoming messages.
> These messages are not fetching in background, but fetching during other
> activity (like fetching from topic). In the other hand there is no way to
> perform "blank" activity to fetch commit confirmation from Consumer.
>
> Right now if I message processing is depended on commit confirmation it is
> impossible to work in reactive way.
>
> Here it is very small example of problem -
> https://gist.github.com/13h3r/496e802afe65233b184a
>
> My questions are:
> - is it bug or design decision?
> - if it is not bug how I can write reactive consumer?
>
> Thanks!
>

Reply via email to