Hey Richard, thanks for the explanation. Recently I read an interesting blog post<https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar (written long time ago), where they define the concept of individual ack which means we could skip records and leave certain records remain on the queue for late processing. This should be something similar to KIP-408 which also shares some motivations for us to invest.
Boyang ________________________________ From: Richard Yu <yohan.richard...@gmail.com> Sent: Friday, January 4, 2019 5:42 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams Hi all, Just bumping this KIP. Would be great if we got some discussion. On Sun, Dec 30, 2018 at 5:13 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi all, > > I made some recent changes to the KIP. It should be more relevant with the > issue now (involves Processor API in detail). > It would be great if you could comment. > > Thanks, > Richard > > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu <yohan.richard...@gmail.com> > wrote: > >> Hi all, >> >> Just changing the title of the KIP. Discovered it wasn't right. >> Thats about it. :) >> >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu <yohan.richard...@gmail.com> >> wrote: >> >>> Sorry, just making a correction. >>> >>> Even if we are processing records out of order, we will still have to >>> checkpoint offset ranges. >>> So it doesn't really change anything even if we are doing in-order >>> processing. >>> >>> Thinking this over, I'm leaning slightly towards maintaining the >>> ordering guarantee. >>> Although when implementing this change, there might be some kinks that >>> we have not thought about which could throw a monkey wrench into the works. >>> >>> But definitely worth trying out, >>> Richard >>> >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <yohan.richard...@gmail.com> >>> wrote: >>> >>>> Hi Boyang, >>>> >>>> I could see where you are going with this. Well, I suppose I should >>>> have added this to alternatives, but I might as well mention it now. >>>> >>>> It had crossed my mind that we consider returning in-order even if >>>> there are multiple threads processing on the same thread. But for this to >>>> happen, we must block for the offsets in-between which have not been >>>> processed yet. For example, offsets 1-50 are being processed by thread1, >>>> while the offsets 51 - 100 are being processed by thread2. We will have to >>>> wait for thread1 to finish processing its offsets first before we return >>>> the records processed by thread2. So in other words, once thread1 is done, >>>> thread2's work up to that point will be returned in one go, but not before >>>> that. >>>> >>>> I suppose this could work, but the client will have to wait some time >>>> before the advantages of multithreaded processing can be seen (i.e. the >>>> first thread has to finish processing its segment of the records first >>>> before any others are returned to guarantee ordering). Another point I >>>> would like to make is that the threads are *asynchronous. *So for us >>>> to know when a thread is done processing a certain segment, we will >>>> probably have a similar policy to how getMetadataAsync() works (i.e. have a >>>> parent thread be notified of when the children threads are done). >>>> [image: image.png] >>>> Just pulling this from the KIP. But instead, we would apply this to >>>> metadata segments instead of just a callback. >>>> I don't know whether or not the tradeoffs are acceptable to the client. >>>> Ordering could be guaranteed, but it would be hard to do. For example, if >>>> there was a crash, we might lose track of which offsets numbers and ranges >>>> we are processing for each child thread, so somehow we need to find a way >>>> to checkpoint those as well (like committing them to a Kafka topic). >>>> >>>> Let me know your thoughts on this approach. It would work, but the >>>> implementation details could be a mess. >>>> >>>> Cheers, >>>> Richard >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bche...@outlook.com> >>>> wrote: >>>> >>>>> Hey Richard, >>>>> >>>>> thanks for the explanation! After some thinking, I do understand more >>>>> about this KIP. The motivation was to increase the throughput and put >>>>> heavy >>>>> lifting RPC calls or IO operations to the background. While I feel the >>>>> ordering is hard to guarantee for async task, it is better to be >>>>> configurable for the end users. >>>>> >>>>> An example use case I could think of is: for every 500 records >>>>> processed, we need an RPC to external storage that takes non-trivial time, >>>>> and before its finishing all 499 records before it shouldn't be visible to >>>>> the end user. In such case, we need to have fine-grained control on the >>>>> visibility of downstream consumer so that our async task is planting a >>>>> barrier while still make 499 records non-blocking process and send to >>>>> downstream. So eventually when the heavy RPC is done, we commit this >>>>> record >>>>> to remove the barrier and make all 500 records available for downstream. >>>>> So >>>>> here we still need to guarantee the ordering within 500 records, while in >>>>> the same time consumer semantic has nothing to change. >>>>> >>>>> Am I making the point clear here? Just want have more discussion on >>>>> the ordering guarantee since I feel it wouldn't be a good idea to break >>>>> consumer ordering guarantee by default. >>>>> >>>>> Best, >>>>> Boyang >>>>> >>>>> ________________________________ >>>>> From: Richard Yu <yohan.richard...@gmail.com> >>>>> Sent: Saturday, December 22, 2018 9:08 AM >>>>> To: dev@kafka.apache.org >>>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams >>>>> >>>>> Hi Boyang, >>>>> >>>>> Thanks for pointing out the possibility of skipping bad records (never >>>>> crossed my mind). I suppose we could make it an option for the user if >>>>> they >>>>> could skip a bad record. It was never the intention of this KIP though >>>>> on >>>>> whether or not to do that. I could log a JIRA on such an issue, but I >>>>> think >>>>> this is out of the KIP's scope. >>>>> >>>>> As for the ordering guarantees, if you are using the standard Kafka >>>>> design >>>>> of one thread per task. Then everything will pretty much remain the >>>>> same. >>>>> However, if we are talking about using multiple threads per task >>>>> (which is >>>>> something that this KIP proposes), then we should probably expect the >>>>> behavior to be somewhat similar to Samza's Async Task as stated in the >>>>> JIRA >>>>> for this KIP (second-last comment). >>>>> Ordering would no longer be possible (so yeah, basically no guarantee >>>>> at >>>>> all). >>>>> >>>>> And how the user handles out-of-order messages is not something I'm >>>>> well >>>>> versed in. I guess they can try to put the messages back in order some >>>>> time >>>>> later on. But I honestly don't know what they will do. >>>>> It would be good if you could give me some insight into this. >>>>> >>>>> Cheers, >>>>> Richard >>>>> >>>>> >>>>> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bche...@outlook.com> >>>>> wrote: >>>>> >>>>> > Thanks Richard for proposing this feature! We also have encountered >>>>> some >>>>> > similar feature request that we want to define a generic async >>>>> processing >>>>> > API<https://issues.apache.org/jira/browse/KAFKA-7566>. >>>>> > >>>>> > However I guess the motivation here is that we should skip big >>>>> records >>>>> > during normal processing, or let a separate task handle those >>>>> records who >>>>> > takes P99 processing time. Since my feeling is that if some edge >>>>> cases >>>>> > happen, could we just skip the bad record and continue processing >>>>> next >>>>> > record? >>>>> > >>>>> > Also I want to understand what kind of ordering guarantee we are >>>>> gonna >>>>> > provide with this new API, or there is no ordering guarantee at >>>>> all? Could >>>>> > we discuss any potential issues if consumer needs to process >>>>> out-of-order >>>>> > messages? >>>>> > >>>>> > Best, >>>>> > Boyang >>>>> > ________________________________ >>>>> > From: Richard Yu <yohan.richard...@gmail.com> >>>>> > Sent: Saturday, December 22, 2018 2:00 AM >>>>> > To: dev@kafka.apache.org >>>>> > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams >>>>> > >>>>> > Hi all, >>>>> > >>>>> > Lately, there has been considerable interest in adding asynchronous >>>>> > processing to Kafka Streams. >>>>> > Here is the KIP for such an addition: >>>>> > >>>>> > >>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams >>>>> > >>>>> > I wish to discuss the best ways to approach this problem. >>>>> > >>>>> > Thanks, >>>>> > Richard Yu >>>>> > >>>>> >>>>