Hello Jason, thank you for reply and explanation. Now it is very clear how it works now.
I have nothing to ask about current state of client, but have few ideas about how it can be implemented/evolved. 1. Push the data received for paused partitions to user level API. Let user decide what to do with this data. This allow us to not discard the data we have already. 2. What do you think about separation of commands and data streams? With such separation we can implement streaming data transfer and control it with command stream. Thanks! On Sun, Feb 7, 2016 at 2:45 AM, Jason Gustafson <ja...@confluent.io> wrote: > > > > - is it cost something? is it any network activity to pause/resume? > > > That is a great question. The pause() method sets an internal flag which > tells the consumer not to send any more fetches. If a fetch was in progress > and returned while the partition was still paused, then the fetched data > will be discarded. This is significant because the consumer implements a > prefetching optimization in order to pipeline fetching and message > processing. Basically before the consumer returns a set of records, it will > send the next fetch to the broker so that the next batch of messages will > be available when the next poll() is called. > > Unfortunately, this prefetching strategy doesn't seem to play nicely with > pause(), at least not as I've proposed to use it above. When poll() returns > with new data for a partition, you would immediately call pause() and then > begin processing the messages. After you finish, you would call > commitAsync() to send the commit. But if the prefetch for the next records > returns before the commit (which seems possible depending on the processing > time and fetch configuration), then we'd have to discard the data. Then > after the commit returns, the partition would be unpaused and we'd resend > the fetch. In the worst case, the consumer would need to send each fetch > twice, which sounds pretty bad. > > I'm actually not sure if the situation is quite this bad. It would be > useful to do some testing to see if this double-fetching is a real problem > in practice (or if I've just overthought it). If it is an issue, then there > are some options to deal with it. For example, maybe it would make make > sense to give users a configuration to turn off prefetching. That or we > could hold onto the fetched records indefinitely under the assumption that > the partition would be unpaused eventually. > > > - there is no API to get information of current status of resume/pause for > > client. Am I wrong? What about having such API? > > > There was a patch to add a paused() API which would return the set of > partitions currently paused. Is that what you had in mind? Let me see if I > can help get that merged. > > > Also, you told about "one option". Is it another? > > > Haha, this is actually the only option that came to mind for what you're > trying to do, but I said "one option" in case someone cleverer than I > thought of another way. > > -Jason > > On Sat, Feb 6, 2016 at 7:34 AM, Alexey Romanchuk < > alexey.romanc...@gmail.com > > wrote: > > > Hi Jason! > > > > Thanks for reply. I try to implement it and looks like it works. Few > > moments about pause/resume: > > - is it cost something? is it any network activity to pause/resume? > > - there is no API to get information of current status of resume/pause > for > > client. Am I wrong? What about having such API? > > > > Also, you told about "one option". Is it another? > > > > Thanks! > > > > On Sat, Feb 6, 2016 at 6:13 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Alexey, > > > > > > The API of the new consumer is designed around an event loop in which > all > > > IO is driven by the poll() API. To make this work, you need to call > > poll() > > > in a loop (see the javadocs for examples). So in this example, when you > > > call commitAsync(), the request is basically just queued up to be sent. > > It > > > won't actually be transmitted, however, until the next poll() is > called. > > > And it may not return until an even later call. > > > > > > If I understand correctly, the basic problem you're trying to figure > out > > is > > > how you can do an asynchronous commit without continuing to receive > > records > > > for the associated partitions. One option would be to use the pause() > API > > > to suspend fetching from those partitions. So you could call > > commitAsync() > > > with the commits you need to send, then pause the respective > partitions. > > > After the commit returns, you could resume() the partitions in the > commit > > > callback, which will reenable fetching. > > > > > > Hope that helps! > > > > > > -Jason > > > > > > > > > > > > On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk < > > > alexey.romanc...@gmail.com > > > > wrote: > > > > > > > Hi all! > > > > > > > > Right now I am working on reactive streams connector to kafka. I am > > using > > > > new client and found strange behavior of commitAsync method which not > > > > calling callbacks at all at some cases. > > > > > > > > I found, that callback calling is a part of handling of incoming > > > messages. > > > > These messages are not fetching in background, but fetching during > > other > > > > activity (like fetching from topic). In the other hand there is no > way > > to > > > > perform "blank" activity to fetch commit confirmation from Consumer. > > > > > > > > Right now if I message processing is depended on commit confirmation > it > > > is > > > > impossible to work in reactive way. > > > > > > > > Here it is very small example of problem - > > > > https://gist.github.com/13h3r/496e802afe65233b184a > > > > > > > > My questions are: > > > > - is it bug or design decision? > > > > - if it is not bug how I can write reactive consumer? > > > > > > > > Thanks! > > > > > > > > > >