Zahari, that makes sense, thanks for reframing your question. I suspect that pause/resume was not intended to be called at high frequency like that, but I agree with you that the current behavior is needlessly inefficient. I like your idea of making it configurable.
Ryanne On Thu, Oct 18, 2018, 6:59 AM Zahari Dichev <zaharidic...@gmail.com> wrote: > Jan, > > Quite insightful indeed. I think your propositions are valid. > > Ryanne, > > I understand that consumers are using a pull model... And yes, indeed if a > consumer is not ready for more records it surely should not call poll. > Except that it needs to do so periodically in order to indicate that its > live. Forget about the "backpressure", I guess I was wrong with phrasing > this so lets not get caught up on it. > > You say pause/resume can be used to prioritise certain topics/partitions > over others. And indeed this is the case. So instead of thinking about it > in terms of backpressure, lets put it in a different way. The Akka streams > connector would like to prioritise certain topics over others, using once > consumer instance. On top of that, add the detail that the priorities > change quite frequently (which translates to calling pause/resume > frequently). So all that being said, what would be a proper way to handle > the situation without throwing the pre-fetched records away when calling > poll on a consumer that happens to have a topic that was recently paused > (and that might be un-paused soon )? Am I the only one who considers that > an actual problem with the use os pause/resume ? Not sure how to explain > the situation in a better way.. > > Zahari > > > On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev <zaharidic...@gmail.com> > wrote: > > > Thanks a lot Jan, > > > > I will read it. > > > > Zahari > > > > On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak <jan.filip...@trivago.com> > > wrote: > > > >> especially my suggestions ;) > >> > >> On 18.10.2018 08:30, Jan Filipiak wrote: > >> > Hi Zahari, > >> > > >> > would you be willing to scan through the KIP-349 discussion a little? > >> > I think it has suggestions that could be interesting for you > >> > > >> > Best Jan > >> > > >> > On 16.10.2018 09:29, Zahari Dichev wrote: > >> >> Hi there Kafka developers, > >> >> > >> >> I am currently trying to find a solution to an issue that has been > >> >> manifesting itself in the Akka streams implementation of the Kafka > >> >> connector. When it comes to consuming messages, the implementation > >> relies > >> >> heavily on the fact that we can pause and resume partitions. In some > >> >> situations when a single consumer instance is shared among several > >> >> streams, > >> >> we might end up with frequently pausing and unpausing a set of topic > >> >> partitions, which is the main facility that allows us to implement > back > >> >> pressure. This however has certain disadvantages, especially when > >> >> there are > >> >> two consumers that differ in terms of processing speed. > >> >> > >> >> To articulate the issue more clearly, imagine that a consumer > maintains > >> >> assignments for two topic partitions *TP1* and *TP2*. This consumer > is > >> >> shared by two streams - S1 and S2. So effectively when we have demand > >> >> from > >> >> only one of the streams - *S1*, we will pause one of the topic > >> partitions > >> >> *TP2* and call *poll()* on the consumer to only retrieve the records > >> for > >> >> the demanded topic partition - *TP1*. The result of that is all the > >> >> records > >> >> that have been prefetched for *TP2* are now thrown away by the > fetcher > >> >> ("*Not > >> >> returning fetched records for assigned partition TP2 since it is no > >> >> longer > >> >> fetchable"*). If we extrapolate that to multiple streams sharing the > >> same > >> >> consumer, we might quickly end up in a situation where we throw > >> >> prefetched > >> >> data quite often. This does not seem like the most efficient approach > >> and > >> >> in fact produces quite a lot of overlapping fetch requests as > >> illustrated > >> >> in the following issue: > >> >> > >> >> https://github.com/akka/alpakka-kafka/issues/549 > >> >> > >> >> I am writing this email to get some initial opinion on a KIP I was > >> >> thinking > >> >> about. What if we give the clients of the Consumer API a bit more > >> control > >> >> of what to do with this prefetched data. Two options I am wondering > >> >> about: > >> >> > >> >> 1. Introduce a configuration setting, such as* > >> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have > to > >> >> think of a better name), which when set to true will return what is > >> >> prefetched instead of throwing it away on calling *poll()*. Since > this > >> is > >> >> amount of data that is bounded by the maximum size of the prefetch, > we > >> >> can > >> >> control what is the most amount of records returned. The client of > the > >> >> consumer API can then be responsible for keeping that data around and > >> use > >> >> it when appropriate (i.e. when demand is present) > >> >> > >> >> 2. Introduce a facility to pass in a buffer into which the prefetched > >> >> records are drained when poll is called and paused partitions have > some > >> >> prefetched records. > >> >> > >> >> Any opinions on the matter are welcome. Thanks a lot ! > >> >> > >> >> Zahari Dichev > >> >> > >> > > >