Hi all, Just changing the title of the KIP. Discovered it wasn't right. Thats about it. :)
On Mon, Dec 24, 2018 at 7:57 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Sorry, just making a correction. > > Even if we are processing records out of order, we will still have to > checkpoint offset ranges. > So it doesn't really change anything even if we are doing in-order > processing. > > Thinking this over, I'm leaning slightly towards maintaining the ordering > guarantee. > Although when implementing this change, there might be some kinks that we > have not thought about which could throw a monkey wrench into the works. > > But definitely worth trying out, > Richard > > On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <yohan.richard...@gmail.com> > wrote: > >> Hi Boyang, >> >> I could see where you are going with this. Well, I suppose I should have >> added this to alternatives, but I might as well mention it now. >> >> It had crossed my mind that we consider returning in-order even if there >> are multiple threads processing on the same thread. But for this to happen, >> we must block for the offsets in-between which have not been processed yet. >> For example, offsets 1-50 are being processed by thread1, while the offsets >> 51 - 100 are being processed by thread2. We will have to wait for thread1 >> to finish processing its offsets first before we return the records >> processed by thread2. So in other words, once thread1 is done, thread2's >> work up to that point will be returned in one go, but not before that. >> >> I suppose this could work, but the client will have to wait some time >> before the advantages of multithreaded processing can be seen (i.e. the >> first thread has to finish processing its segment of the records first >> before any others are returned to guarantee ordering). Another point I >> would like to make is that the threads are *asynchronous. *So for us to >> know when a thread is done processing a certain segment, we will probably >> have a similar policy to how getMetadataAsync() works (i.e. have a parent >> thread be notified of when the children threads are done). >> [image: image.png] >> Just pulling this from the KIP. But instead, we would apply this to >> metadata segments instead of just a callback. >> I don't know whether or not the tradeoffs are acceptable to the client. >> Ordering could be guaranteed, but it would be hard to do. For example, if >> there was a crash, we might lose track of which offsets numbers and ranges >> we are processing for each child thread, so somehow we need to find a way >> to checkpoint those as well (like committing them to a Kafka topic). >> >> Let me know your thoughts on this approach. It would work, but the >> implementation details could be a mess. >> >> Cheers, >> Richard >> >> >> >> >> >> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bche...@outlook.com> wrote: >> >>> Hey Richard, >>> >>> thanks for the explanation! After some thinking, I do understand more >>> about this KIP. The motivation was to increase the throughput and put heavy >>> lifting RPC calls or IO operations to the background. While I feel the >>> ordering is hard to guarantee for async task, it is better to be >>> configurable for the end users. >>> >>> An example use case I could think of is: for every 500 records >>> processed, we need an RPC to external storage that takes non-trivial time, >>> and before its finishing all 499 records before it shouldn't be visible to >>> the end user. In such case, we need to have fine-grained control on the >>> visibility of downstream consumer so that our async task is planting a >>> barrier while still make 499 records non-blocking process and send to >>> downstream. So eventually when the heavy RPC is done, we commit this record >>> to remove the barrier and make all 500 records available for downstream. So >>> here we still need to guarantee the ordering within 500 records, while in >>> the same time consumer semantic has nothing to change. >>> >>> Am I making the point clear here? Just want have more discussion on the >>> ordering guarantee since I feel it wouldn't be a good idea to break >>> consumer ordering guarantee by default. >>> >>> Best, >>> Boyang >>> >>> ________________________________ >>> From: Richard Yu <yohan.richard...@gmail.com> >>> Sent: Saturday, December 22, 2018 9:08 AM >>> To: dev@kafka.apache.org >>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams >>> >>> Hi Boyang, >>> >>> Thanks for pointing out the possibility of skipping bad records (never >>> crossed my mind). I suppose we could make it an option for the user if >>> they >>> could skip a bad record. It was never the intention of this KIP though on >>> whether or not to do that. I could log a JIRA on such an issue, but I >>> think >>> this is out of the KIP's scope. >>> >>> As for the ordering guarantees, if you are using the standard Kafka >>> design >>> of one thread per task. Then everything will pretty much remain the same. >>> However, if we are talking about using multiple threads per task (which >>> is >>> something that this KIP proposes), then we should probably expect the >>> behavior to be somewhat similar to Samza's Async Task as stated in the >>> JIRA >>> for this KIP (second-last comment). >>> Ordering would no longer be possible (so yeah, basically no guarantee at >>> all). >>> >>> And how the user handles out-of-order messages is not something I'm well >>> versed in. I guess they can try to put the messages back in order some >>> time >>> later on. But I honestly don't know what they will do. >>> It would be good if you could give me some insight into this. >>> >>> Cheers, >>> Richard >>> >>> >>> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bche...@outlook.com> wrote: >>> >>> > Thanks Richard for proposing this feature! We also have encountered >>> some >>> > similar feature request that we want to define a generic async >>> processing >>> > API<https://issues.apache.org/jira/browse/KAFKA-7566>. >>> > >>> > However I guess the motivation here is that we should skip big records >>> > during normal processing, or let a separate task handle those records >>> who >>> > takes P99 processing time. Since my feeling is that if some edge cases >>> > happen, could we just skip the bad record and continue processing next >>> > record? >>> > >>> > Also I want to understand what kind of ordering guarantee we are gonna >>> > provide with this new API, or there is no ordering guarantee at all? >>> Could >>> > we discuss any potential issues if consumer needs to process >>> out-of-order >>> > messages? >>> > >>> > Best, >>> > Boyang >>> > ________________________________ >>> > From: Richard Yu <yohan.richard...@gmail.com> >>> > Sent: Saturday, December 22, 2018 2:00 AM >>> > To: dev@kafka.apache.org >>> > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams >>> > >>> > Hi all, >>> > >>> > Lately, there has been considerable interest in adding asynchronous >>> > processing to Kafka Streams. >>> > Here is the KIP for such an addition: >>> > >>> > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams >>> > >>> > I wish to discuss the best ways to approach this problem. >>> > >>> > Thanks, >>> > Richard Yu >>> > >>> >>