Sorry, just making a correction. Even if we are processing records out of order, we will still have to checkpoint offset ranges. So it doesn't really change anything even if we are doing in-order processing.
Thinking this over, I'm leaning slightly towards maintaining the ordering guarantee. Although when implementing this change, there might be some kinks that we have not thought about which could throw a monkey wrench into the works. But definitely worth trying out, Richard On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi Boyang, > > I could see where you are going with this. Well, I suppose I should have > added this to alternatives, but I might as well mention it now. > > It had crossed my mind that we consider returning in-order even if there > are multiple threads processing on the same thread. But for this to happen, > we must block for the offsets in-between which have not been processed yet. > For example, offsets 1-50 are being processed by thread1, while the offsets > 51 - 100 are being processed by thread2. We will have to wait for thread1 > to finish processing its offsets first before we return the records > processed by thread2. So in other words, once thread1 is done, thread2's > work up to that point will be returned in one go, but not before that. > > I suppose this could work, but the client will have to wait some time > before the advantages of multithreaded processing can be seen (i.e. the > first thread has to finish processing its segment of the records first > before any others are returned to guarantee ordering). Another point I > would like to make is that the threads are *asynchronous. *So for us to > know when a thread is done processing a certain segment, we will probably > have a similar policy to how getMetadataAsync() works (i.e. have a parent > thread be notified of when the children threads are done). > [image: image.png] > Just pulling this from the KIP. But instead, we would apply this to > metadata segments instead of just a callback. > I don't know whether or not the tradeoffs are acceptable to the client. > Ordering could be guaranteed, but it would be hard to do. For example, if > there was a crash, we might lose track of which offsets numbers and ranges > we are processing for each child thread, so somehow we need to find a way > to checkpoint those as well (like committing them to a Kafka topic). > > Let me know your thoughts on this approach. It would work, but the > implementation details could be a mess. > > Cheers, > Richard > > > > > > On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bche...@outlook.com> wrote: > >> Hey Richard, >> >> thanks for the explanation! After some thinking, I do understand more >> about this KIP. The motivation was to increase the throughput and put heavy >> lifting RPC calls or IO operations to the background. While I feel the >> ordering is hard to guarantee for async task, it is better to be >> configurable for the end users. >> >> An example use case I could think of is: for every 500 records processed, >> we need an RPC to external storage that takes non-trivial time, and before >> its finishing all 499 records before it shouldn't be visible to the end >> user. In such case, we need to have fine-grained control on the visibility >> of downstream consumer so that our async task is planting a barrier while >> still make 499 records non-blocking process and send to downstream. So >> eventually when the heavy RPC is done, we commit this record to remove the >> barrier and make all 500 records available for downstream. So here we still >> need to guarantee the ordering within 500 records, while in the same time >> consumer semantic has nothing to change. >> >> Am I making the point clear here? Just want have more discussion on the >> ordering guarantee since I feel it wouldn't be a good idea to break >> consumer ordering guarantee by default. >> >> Best, >> Boyang >> >> ________________________________ >> From: Richard Yu <yohan.richard...@gmail.com> >> Sent: Saturday, December 22, 2018 9:08 AM >> To: dev@kafka.apache.org >> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams >> >> Hi Boyang, >> >> Thanks for pointing out the possibility of skipping bad records (never >> crossed my mind). I suppose we could make it an option for the user if >> they >> could skip a bad record. It was never the intention of this KIP though on >> whether or not to do that. I could log a JIRA on such an issue, but I >> think >> this is out of the KIP's scope. >> >> As for the ordering guarantees, if you are using the standard Kafka design >> of one thread per task. Then everything will pretty much remain the same. >> However, if we are talking about using multiple threads per task (which is >> something that this KIP proposes), then we should probably expect the >> behavior to be somewhat similar to Samza's Async Task as stated in the >> JIRA >> for this KIP (second-last comment). >> Ordering would no longer be possible (so yeah, basically no guarantee at >> all). >> >> And how the user handles out-of-order messages is not something I'm well >> versed in. I guess they can try to put the messages back in order some >> time >> later on. But I honestly don't know what they will do. >> It would be good if you could give me some insight into this. >> >> Cheers, >> Richard >> >> >> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bche...@outlook.com> wrote: >> >> > Thanks Richard for proposing this feature! We also have encountered some >> > similar feature request that we want to define a generic async >> processing >> > API<https://issues.apache.org/jira/browse/KAFKA-7566>. >> > >> > However I guess the motivation here is that we should skip big records >> > during normal processing, or let a separate task handle those records >> who >> > takes P99 processing time. Since my feeling is that if some edge cases >> > happen, could we just skip the bad record and continue processing next >> > record? >> > >> > Also I want to understand what kind of ordering guarantee we are gonna >> > provide with this new API, or there is no ordering guarantee at all? >> Could >> > we discuss any potential issues if consumer needs to process >> out-of-order >> > messages? >> > >> > Best, >> > Boyang >> > ________________________________ >> > From: Richard Yu <yohan.richard...@gmail.com> >> > Sent: Saturday, December 22, 2018 2:00 AM >> > To: dev@kafka.apache.org >> > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams >> > >> > Hi all, >> > >> > Lately, there has been considerable interest in adding asynchronous >> > processing to Kafka Streams. >> > Here is the KIP for such an addition: >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams >> > >> > I wish to discuss the best ways to approach this problem. >> > >> > Thanks, >> > Richard Yu >> > >> >