Hi Boyang, Interesting article. Although something crossed my mind. When skipping bad records, we couldn't go back to them to process again to guarantee ordering i.e (both exactly-once and at-least-once would not be supported, only at-most-once). Also, in Kafka, when it comes to individually acking every single record, the resulting latency is horrible (from what I heard). We actually discussed something like this in https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some insight since it is a related issue.
I hope this helps, Richard On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen <bche...@outlook.com> wrote: > Hey Richard, > > thanks for the explanation. Recently I read an interesting blog post< > https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar > (written long time ago), where they define the concept of individual ack > which means we could skip records and leave certain records remain on the > queue for late processing. This should be something similar to KIP-408 > which also shares some motivations for us to invest. > > Boyang > > ________________________________ > From: Richard Yu <yohan.richard...@gmail.com> > Sent: Friday, January 4, 2019 5:42 AM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka > Streams > > Hi all, > > Just bumping this KIP. Would be great if we got some discussion. > > > On Sun, Dec 30, 2018 at 5:13 PM Richard Yu <yohan.richard...@gmail.com> > wrote: > > > Hi all, > > > > I made some recent changes to the KIP. It should be more relevant with > the > > issue now (involves Processor API in detail). > > It would be great if you could comment. > > > > Thanks, > > Richard > > > > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu <yohan.richard...@gmail.com> > > wrote: > > > >> Hi all, > >> > >> Just changing the title of the KIP. Discovered it wasn't right. > >> Thats about it. :) > >> > >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu <yohan.richard...@gmail.com> > >> wrote: > >> > >>> Sorry, just making a correction. > >>> > >>> Even if we are processing records out of order, we will still have to > >>> checkpoint offset ranges. > >>> So it doesn't really change anything even if we are doing in-order > >>> processing. > >>> > >>> Thinking this over, I'm leaning slightly towards maintaining the > >>> ordering guarantee. > >>> Although when implementing this change, there might be some kinks that > >>> we have not thought about which could throw a monkey wrench into the > works. > >>> > >>> But definitely worth trying out, > >>> Richard > >>> > >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <yohan.richard...@gmail.com > > > >>> wrote: > >>> > >>>> Hi Boyang, > >>>> > >>>> I could see where you are going with this. Well, I suppose I should > >>>> have added this to alternatives, but I might as well mention it now. > >>>> > >>>> It had crossed my mind that we consider returning in-order even if > >>>> there are multiple threads processing on the same thread. But for > this to > >>>> happen, we must block for the offsets in-between which have not been > >>>> processed yet. For example, offsets 1-50 are being processed by > thread1, > >>>> while the offsets 51 - 100 are being processed by thread2. We will > have to > >>>> wait for thread1 to finish processing its offsets first before we > return > >>>> the records processed by thread2. So in other words, once thread1 is > done, > >>>> thread2's work up to that point will be returned in one go, but not > before > >>>> that. > >>>> > >>>> I suppose this could work, but the client will have to wait some time > >>>> before the advantages of multithreaded processing can be seen (i.e. > the > >>>> first thread has to finish processing its segment of the records first > >>>> before any others are returned to guarantee ordering). Another point I > >>>> would like to make is that the threads are *asynchronous. *So for us > >>>> to know when a thread is done processing a certain segment, we will > >>>> probably have a similar policy to how getMetadataAsync() works (i.e. > have a > >>>> parent thread be notified of when the children threads are done). > >>>> [image: image.png] > >>>> Just pulling this from the KIP. But instead, we would apply this to > >>>> metadata segments instead of just a callback. > >>>> I don't know whether or not the tradeoffs are acceptable to the > client. > >>>> Ordering could be guaranteed, but it would be hard to do. For > example, if > >>>> there was a crash, we might lose track of which offsets numbers and > ranges > >>>> we are processing for each child thread, so somehow we need to find a > way > >>>> to checkpoint those as well (like committing them to a Kafka topic). > >>>> > >>>> Let me know your thoughts on this approach. It would work, but the > >>>> implementation details could be a mess. > >>>> > >>>> Cheers, > >>>> Richard > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bche...@outlook.com> > >>>> wrote: > >>>> > >>>>> Hey Richard, > >>>>> > >>>>> thanks for the explanation! After some thinking, I do understand more > >>>>> about this KIP. The motivation was to increase the throughput and > put heavy > >>>>> lifting RPC calls or IO operations to the background. While I feel > the > >>>>> ordering is hard to guarantee for async task, it is better to be > >>>>> configurable for the end users. > >>>>> > >>>>> An example use case I could think of is: for every 500 records > >>>>> processed, we need an RPC to external storage that takes non-trivial > time, > >>>>> and before its finishing all 499 records before it shouldn't be > visible to > >>>>> the end user. In such case, we need to have fine-grained control on > the > >>>>> visibility of downstream consumer so that our async task is planting > a > >>>>> barrier while still make 499 records non-blocking process and send to > >>>>> downstream. So eventually when the heavy RPC is done, we commit this > record > >>>>> to remove the barrier and make all 500 records available for > downstream. So > >>>>> here we still need to guarantee the ordering within 500 records, > while in > >>>>> the same time consumer semantic has nothing to change. > >>>>> > >>>>> Am I making the point clear here? Just want have more discussion on > >>>>> the ordering guarantee since I feel it wouldn't be a good idea to > break > >>>>> consumer ordering guarantee by default. > >>>>> > >>>>> Best, > >>>>> Boyang > >>>>> > >>>>> ________________________________ > >>>>> From: Richard Yu <yohan.richard...@gmail.com> > >>>>> Sent: Saturday, December 22, 2018 9:08 AM > >>>>> To: dev@kafka.apache.org > >>>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams > >>>>> > >>>>> Hi Boyang, > >>>>> > >>>>> Thanks for pointing out the possibility of skipping bad records > (never > >>>>> crossed my mind). I suppose we could make it an option for the user > if > >>>>> they > >>>>> could skip a bad record. It was never the intention of this KIP > though > >>>>> on > >>>>> whether or not to do that. I could log a JIRA on such an issue, but I > >>>>> think > >>>>> this is out of the KIP's scope. > >>>>> > >>>>> As for the ordering guarantees, if you are using the standard Kafka > >>>>> design > >>>>> of one thread per task. Then everything will pretty much remain the > >>>>> same. > >>>>> However, if we are talking about using multiple threads per task > >>>>> (which is > >>>>> something that this KIP proposes), then we should probably expect the > >>>>> behavior to be somewhat similar to Samza's Async Task as stated in > the > >>>>> JIRA > >>>>> for this KIP (second-last comment). > >>>>> Ordering would no longer be possible (so yeah, basically no guarantee > >>>>> at > >>>>> all). > >>>>> > >>>>> And how the user handles out-of-order messages is not something I'm > >>>>> well > >>>>> versed in. I guess they can try to put the messages back in order > some > >>>>> time > >>>>> later on. But I honestly don't know what they will do. > >>>>> It would be good if you could give me some insight into this. > >>>>> > >>>>> Cheers, > >>>>> Richard > >>>>> > >>>>> > >>>>> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bche...@outlook.com> > >>>>> wrote: > >>>>> > >>>>> > Thanks Richard for proposing this feature! We also have encountered > >>>>> some > >>>>> > similar feature request that we want to define a generic async > >>>>> processing > >>>>> > API<https://issues.apache.org/jira/browse/KAFKA-7566>. > >>>>> > > >>>>> > However I guess the motivation here is that we should skip big > >>>>> records > >>>>> > during normal processing, or let a separate task handle those > >>>>> records who > >>>>> > takes P99 processing time. Since my feeling is that if some edge > >>>>> cases > >>>>> > happen, could we just skip the bad record and continue processing > >>>>> next > >>>>> > record? > >>>>> > > >>>>> > Also I want to understand what kind of ordering guarantee we are > >>>>> gonna > >>>>> > provide with this new API, or there is no ordering guarantee at > >>>>> all? Could > >>>>> > we discuss any potential issues if consumer needs to process > >>>>> out-of-order > >>>>> > messages? > >>>>> > > >>>>> > Best, > >>>>> > Boyang > >>>>> > ________________________________ > >>>>> > From: Richard Yu <yohan.richard...@gmail.com> > >>>>> > Sent: Saturday, December 22, 2018 2:00 AM > >>>>> > To: dev@kafka.apache.org > >>>>> > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams > >>>>> > > >>>>> > Hi all, > >>>>> > > >>>>> > Lately, there has been considerable interest in adding asynchronous > >>>>> > processing to Kafka Streams. > >>>>> > Here is the KIP for such an addition: > >>>>> > > >>>>> > > >>>>> > > >>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams > >>>>> > > >>>>> > I wish to discuss the best ways to approach this problem. > >>>>> > > >>>>> > Thanks, > >>>>> > Richard Yu > >>>>> > > >>>>> > >>>> >