Hi Boyang,

Interesting article. Although something crossed my mind. When skipping bad
records, we couldn't go back to them to process again to guarantee ordering
i.e (both exactly-once and at-least-once would not be supported, only
at-most-once). Also, in Kafka, when it comes to individually acking every
single record, the resulting latency is horrible (from what I heard). We
actually discussed something like this in
https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some
insight since it is a related issue.

I hope this helps,
Richard




On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen <bche...@outlook.com> wrote:

> Hey Richard,
>
> thanks for the explanation. Recently I read an interesting blog post<
> https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar
> (written long time ago), where they define the concept of individual ack
> which means we could skip records and leave certain records remain on the
> queue for late processing. This should be something similar to KIP-408
> which also shares some motivations for us to invest.
>
> Boyang
>
> ________________________________
> From: Richard Yu <yohan.richard...@gmail.com>
> Sent: Friday, January 4, 2019 5:42 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka
> Streams
>
> Hi all,
>
> Just bumping this KIP. Would be great if we got some discussion.
>
>
> On Sun, Dec 30, 2018 at 5:13 PM Richard Yu <yohan.richard...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I made some recent changes to the KIP. It should be more relevant with
> the
> > issue now (involves Processor API in detail).
> > It would be great if you could comment.
> >
> > Thanks,
> > Richard
> >
> > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu <yohan.richard...@gmail.com>
> > wrote:
> >
> >> Hi all,
> >>
> >> Just changing the title of the KIP. Discovered it wasn't right.
> >> Thats about it. :)
> >>
> >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu <yohan.richard...@gmail.com>
> >> wrote:
> >>
> >>> Sorry, just making a correction.
> >>>
> >>> Even if we are processing records out of order, we will still have to
> >>> checkpoint offset ranges.
> >>> So it doesn't really change anything even if we are doing in-order
> >>> processing.
> >>>
> >>> Thinking this over, I'm leaning slightly towards maintaining the
> >>> ordering guarantee.
> >>> Although when implementing this change, there might be some kinks that
> >>> we have not thought about which could throw a monkey wrench into the
> works.
> >>>
> >>> But definitely worth trying out,
> >>> Richard
> >>>
> >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <yohan.richard...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi Boyang,
> >>>>
> >>>> I could see where you are going with this. Well, I suppose I should
> >>>> have added this to alternatives, but I might as well mention it now.
> >>>>
> >>>> It had crossed my mind that we consider returning in-order even if
> >>>> there are multiple threads processing on the same thread. But for
> this to
> >>>> happen, we must block for the offsets in-between which have not been
> >>>> processed yet. For example, offsets 1-50 are being processed by
> thread1,
> >>>> while the offsets 51 - 100 are being processed by thread2. We will
> have to
> >>>> wait for thread1 to finish processing its offsets first before we
> return
> >>>> the records processed by thread2. So in other words, once thread1 is
> done,
> >>>> thread2's work up to that point will be returned in one go, but not
> before
> >>>> that.
> >>>>
> >>>> I suppose this could work, but the client will have to wait some time
> >>>> before the advantages of multithreaded processing can be seen (i.e.
> the
> >>>> first thread has to finish processing its segment of the records first
> >>>> before any others are returned to guarantee ordering). Another point I
> >>>> would like to make is that the threads are *asynchronous. *So for us
> >>>> to know when a thread is done processing a certain segment, we will
> >>>> probably have a similar policy to how getMetadataAsync() works (i.e.
> have a
> >>>> parent thread be notified of when the children threads are done).
> >>>> [image: image.png]
> >>>> Just pulling this from the KIP. But instead, we would apply this to
> >>>> metadata segments instead of just a callback.
> >>>> I don't know whether or not the tradeoffs are acceptable to the
> client.
> >>>> Ordering could be guaranteed, but it would be hard to do. For
> example, if
> >>>> there was a crash, we might lose track of which offsets numbers and
> ranges
> >>>> we are processing for each child thread, so somehow we need to find a
> way
> >>>> to checkpoint those as well (like committing them to a Kafka topic).
> >>>>
> >>>> Let me know your thoughts on this approach. It would work, but the
> >>>> implementation details could be a mess.
> >>>>
> >>>> Cheers,
> >>>> Richard
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bche...@outlook.com>
> >>>> wrote:
> >>>>
> >>>>> Hey Richard,
> >>>>>
> >>>>> thanks for the explanation! After some thinking, I do understand more
> >>>>> about this KIP. The motivation was to increase the throughput and
> put heavy
> >>>>> lifting RPC calls or IO operations to the background. While I feel
> the
> >>>>> ordering is hard to guarantee for async task, it is better to be
> >>>>> configurable for the end users.
> >>>>>
> >>>>> An example use case I could think of is: for every 500 records
> >>>>> processed, we need an RPC to external storage that takes non-trivial
> time,
> >>>>> and before its finishing all 499 records before it shouldn't be
> visible to
> >>>>> the end user. In such case, we need to have fine-grained control on
> the
> >>>>> visibility of downstream consumer so that our async task is planting
> a
> >>>>> barrier while still make 499 records non-blocking process and send to
> >>>>> downstream. So eventually when the heavy RPC is done, we commit this
> record
> >>>>> to remove the barrier and make all 500 records available for
> downstream. So
> >>>>> here we still need to guarantee the ordering within 500 records,
> while in
> >>>>> the same time consumer semantic has nothing to change.
> >>>>>
> >>>>> Am I making the point clear here? Just want have more discussion on
> >>>>> the ordering guarantee since I feel it wouldn't be a good idea to
> break
> >>>>> consumer ordering guarantee by default.
> >>>>>
> >>>>> Best,
> >>>>> Boyang
> >>>>>
> >>>>> ________________________________
> >>>>> From: Richard Yu <yohan.richard...@gmail.com>
> >>>>> Sent: Saturday, December 22, 2018 9:08 AM
> >>>>> To: dev@kafka.apache.org
> >>>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams
> >>>>>
> >>>>> Hi Boyang,
> >>>>>
> >>>>> Thanks for pointing out the possibility of skipping bad records
> (never
> >>>>> crossed my mind). I suppose we could make it an option for the user
> if
> >>>>> they
> >>>>> could skip a bad record. It was never the intention of this KIP
> though
> >>>>> on
> >>>>> whether or not to do that. I could log a JIRA on such an issue, but I
> >>>>> think
> >>>>> this is out of the KIP's scope.
> >>>>>
> >>>>> As for the ordering guarantees, if you are using the standard Kafka
> >>>>> design
> >>>>> of one thread per task. Then everything will pretty much remain the
> >>>>> same.
> >>>>> However, if we are talking about using multiple threads per task
> >>>>> (which is
> >>>>> something that this KIP proposes), then we should probably expect the
> >>>>> behavior to be somewhat similar to Samza's Async Task as stated in
> the
> >>>>> JIRA
> >>>>> for this KIP (second-last comment).
> >>>>> Ordering would no longer be possible (so yeah, basically no guarantee
> >>>>> at
> >>>>> all).
> >>>>>
> >>>>> And how the user handles out-of-order messages is not something I'm
> >>>>> well
> >>>>> versed in. I guess they can try to put the messages back in order
> some
> >>>>> time
> >>>>> later on. But I honestly don't know what they will do.
> >>>>> It would be good if you could give me some insight into this.
> >>>>>
> >>>>> Cheers,
> >>>>> Richard
> >>>>>
> >>>>>
> >>>>> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bche...@outlook.com>
> >>>>> wrote:
> >>>>>
> >>>>> > Thanks Richard for proposing this feature! We also have encountered
> >>>>> some
> >>>>> > similar feature request that we want to define a generic async
> >>>>> processing
> >>>>> > API<https://issues.apache.org/jira/browse/KAFKA-7566>.
> >>>>> >
> >>>>> > However I guess the motivation here is that we should skip big
> >>>>> records
> >>>>> > during normal processing, or let a separate task handle those
> >>>>> records who
> >>>>> > takes P99 processing time. Since my feeling is that if some edge
> >>>>> cases
> >>>>> > happen, could we just skip the bad record and continue processing
> >>>>> next
> >>>>> > record?
> >>>>> >
> >>>>> > Also I want to understand what kind of ordering guarantee we are
> >>>>> gonna
> >>>>> > provide with this new API, or there is no ordering guarantee at
> >>>>> all?  Could
> >>>>> > we discuss any potential issues if consumer needs to process
> >>>>> out-of-order
> >>>>> > messages?
> >>>>> >
> >>>>> > Best,
> >>>>> > Boyang
> >>>>> > ________________________________
> >>>>> > From: Richard Yu <yohan.richard...@gmail.com>
> >>>>> > Sent: Saturday, December 22, 2018 2:00 AM
> >>>>> > To: dev@kafka.apache.org
> >>>>> > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams
> >>>>> >
> >>>>> > Hi all,
> >>>>> >
> >>>>> > Lately, there has been considerable interest in adding asynchronous
> >>>>> > processing to Kafka Streams.
> >>>>> > Here is the KIP for such an addition:
> >>>>> >
> >>>>> >
> >>>>> >
> >>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
> >>>>> >
> >>>>> > I wish to discuss the best ways to approach this problem.
> >>>>> >
> >>>>> > Thanks,
> >>>>> > Richard Yu
> >>>>> >
> >>>>>
> >>>>
>

Reply via email to