KIP-406: GlobalStreamThread should honor custom reset policy
Hi everybody, There is a new KIP regarding the resilience of GlobalStreamThread which could be seen below: https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy We are considering the new addition of some new reset policy. It would be great if you could pitch in! Thanks, Richard Yu
Re: KIP-406: GlobalStreamThread should honor custom reset policy
Hi Matthias, It would be great if we got your input on this. On Sun, Dec 16, 2018 at 3:06 PM Richard Yu wrote: > Hi everybody, > > There is a new KIP regarding the resilience of GlobalStreamThread which > could be seen below: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy > > We are considering the new addition of some new reset policy. It would be > great if you could pitch in! > > Thanks, > Richard Yu >
KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, Lately, there has been considerable interest in adding asynchronous processing to Kafka Streams. Here is the KIP for such an addition: https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams I wish to discuss the best ways to approach this problem. Thanks, Richard Yu
Re: KIP-408: Add Asynchronous Processing to Kafka Streams
Hi Boyang, Thanks for pointing out the possibility of skipping bad records (never crossed my mind). I suppose we could make it an option for the user if they could skip a bad record. It was never the intention of this KIP though on whether or not to do that. I could log a JIRA on such an issue, but I think this is out of the KIP's scope. As for the ordering guarantees, if you are using the standard Kafka design of one thread per task. Then everything will pretty much remain the same. However, if we are talking about using multiple threads per task (which is something that this KIP proposes), then we should probably expect the behavior to be somewhat similar to Samza's Async Task as stated in the JIRA for this KIP (second-last comment). Ordering would no longer be possible (so yeah, basically no guarantee at all). And how the user handles out-of-order messages is not something I'm well versed in. I guess they can try to put the messages back in order some time later on. But I honestly don't know what they will do. It would be good if you could give me some insight into this. Cheers, Richard On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen wrote: > Thanks Richard for proposing this feature! We also have encountered some > similar feature request that we want to define a generic async processing > API<https://issues.apache.org/jira/browse/KAFKA-7566>. > > However I guess the motivation here is that we should skip big records > during normal processing, or let a separate task handle those records who > takes P99 processing time. Since my feeling is that if some edge cases > happen, could we just skip the bad record and continue processing next > record? > > Also I want to understand what kind of ordering guarantee we are gonna > provide with this new API, or there is no ordering guarantee at all? Could > we discuss any potential issues if consumer needs to process out-of-order > messages? > > Best, > Boyang > > From: Richard Yu > Sent: Saturday, December 22, 2018 2:00 AM > To: dev@kafka.apache.org > Subject: KIP-408: Add Asynchronous Processing to Kafka Streams > > Hi all, > > Lately, there has been considerable interest in adding asynchronous > processing to Kafka Streams. > Here is the KIP for such an addition: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams > > I wish to discuss the best ways to approach this problem. > > Thanks, > Richard Yu >
Re: KIP-408: Add Asynchronous Processing to Kafka Streams
Hi Boyang, I could see where you are going with this. Well, I suppose I should have added this to alternatives, but I might as well mention it now. It had crossed my mind that we consider returning in-order even if there are multiple threads processing on the same thread. But for this to happen, we must block for the offsets in-between which have not been processed yet. For example, offsets 1-50 are being processed by thread1, while the offsets 51 - 100 are being processed by thread2. We will have to wait for thread1 to finish processing its offsets first before we return the records processed by thread2. So in other words, once thread1 is done, thread2's work up to that point will be returned in one go, but not before that. I suppose this could work, but the client will have to wait some time before the advantages of multithreaded processing can be seen (i.e. the first thread has to finish processing its segment of the records first before any others are returned to guarantee ordering). Another point I would like to make is that the threads are *asynchronous. *So for us to know when a thread is done processing a certain segment, we will probably have a similar policy to how getMetadataAsync() works (i.e. have a parent thread be notified of when the children threads are done). [image: image.png] Just pulling this from the KIP. But instead, we would apply this to metadata segments instead of just a callback. I don't know whether or not the tradeoffs are acceptable to the client. Ordering could be guaranteed, but it would be hard to do. For example, if there was a crash, we might lose track of which offsets numbers and ranges we are processing for each child thread, so somehow we need to find a way to checkpoint those as well (like committing them to a Kafka topic). Let me know your thoughts on this approach. It would work, but the implementation details could be a mess. Cheers, Richard On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen wrote: > Hey Richard, > > thanks for the explanation! After some thinking, I do understand more > about this KIP. The motivation was to increase the throughput and put heavy > lifting RPC calls or IO operations to the background. While I feel the > ordering is hard to guarantee for async task, it is better to be > configurable for the end users. > > An example use case I could think of is: for every 500 records processed, > we need an RPC to external storage that takes non-trivial time, and before > its finishing all 499 records before it shouldn't be visible to the end > user. In such case, we need to have fine-grained control on the visibility > of downstream consumer so that our async task is planting a barrier while > still make 499 records non-blocking process and send to downstream. So > eventually when the heavy RPC is done, we commit this record to remove the > barrier and make all 500 records available for downstream. So here we still > need to guarantee the ordering within 500 records, while in the same time > consumer semantic has nothing to change. > > Am I making the point clear here? Just want have more discussion on the > ordering guarantee since I feel it wouldn't be a good idea to break > consumer ordering guarantee by default. > > Best, > Boyang > > > From: Richard Yu > Sent: Saturday, December 22, 2018 9:08 AM > To: dev@kafka.apache.org > Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams > > Hi Boyang, > > Thanks for pointing out the possibility of skipping bad records (never > crossed my mind). I suppose we could make it an option for the user if they > could skip a bad record. It was never the intention of this KIP though on > whether or not to do that. I could log a JIRA on such an issue, but I think > this is out of the KIP's scope. > > As for the ordering guarantees, if you are using the standard Kafka design > of one thread per task. Then everything will pretty much remain the same. > However, if we are talking about using multiple threads per task (which is > something that this KIP proposes), then we should probably expect the > behavior to be somewhat similar to Samza's Async Task as stated in the JIRA > for this KIP (second-last comment). > Ordering would no longer be possible (so yeah, basically no guarantee at > all). > > And how the user handles out-of-order messages is not something I'm well > versed in. I guess they can try to put the messages back in order some time > later on. But I honestly don't know what they will do. > It would be good if you could give me some insight into this. > > Cheers, > Richard > > > On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen wrote: > > > Thanks Richard for proposing this feature! We also have encountered some > > similar feat
Re: KIP-408: Add Asynchronous Processing to Kafka Streams
Sorry, just making a correction. Even if we are processing records out of order, we will still have to checkpoint offset ranges. So it doesn't really change anything even if we are doing in-order processing. Thinking this over, I'm leaning slightly towards maintaining the ordering guarantee. Although when implementing this change, there might be some kinks that we have not thought about which could throw a monkey wrench into the works. But definitely worth trying out, Richard On Mon, Dec 24, 2018 at 6:51 PM Richard Yu wrote: > Hi Boyang, > > I could see where you are going with this. Well, I suppose I should have > added this to alternatives, but I might as well mention it now. > > It had crossed my mind that we consider returning in-order even if there > are multiple threads processing on the same thread. But for this to happen, > we must block for the offsets in-between which have not been processed yet. > For example, offsets 1-50 are being processed by thread1, while the offsets > 51 - 100 are being processed by thread2. We will have to wait for thread1 > to finish processing its offsets first before we return the records > processed by thread2. So in other words, once thread1 is done, thread2's > work up to that point will be returned in one go, but not before that. > > I suppose this could work, but the client will have to wait some time > before the advantages of multithreaded processing can be seen (i.e. the > first thread has to finish processing its segment of the records first > before any others are returned to guarantee ordering). Another point I > would like to make is that the threads are *asynchronous. *So for us to > know when a thread is done processing a certain segment, we will probably > have a similar policy to how getMetadataAsync() works (i.e. have a parent > thread be notified of when the children threads are done). > [image: image.png] > Just pulling this from the KIP. But instead, we would apply this to > metadata segments instead of just a callback. > I don't know whether or not the tradeoffs are acceptable to the client. > Ordering could be guaranteed, but it would be hard to do. For example, if > there was a crash, we might lose track of which offsets numbers and ranges > we are processing for each child thread, so somehow we need to find a way > to checkpoint those as well (like committing them to a Kafka topic). > > Let me know your thoughts on this approach. It would work, but the > implementation details could be a mess. > > Cheers, > Richard > > > > > > On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen wrote: > >> Hey Richard, >> >> thanks for the explanation! After some thinking, I do understand more >> about this KIP. The motivation was to increase the throughput and put heavy >> lifting RPC calls or IO operations to the background. While I feel the >> ordering is hard to guarantee for async task, it is better to be >> configurable for the end users. >> >> An example use case I could think of is: for every 500 records processed, >> we need an RPC to external storage that takes non-trivial time, and before >> its finishing all 499 records before it shouldn't be visible to the end >> user. In such case, we need to have fine-grained control on the visibility >> of downstream consumer so that our async task is planting a barrier while >> still make 499 records non-blocking process and send to downstream. So >> eventually when the heavy RPC is done, we commit this record to remove the >> barrier and make all 500 records available for downstream. So here we still >> need to guarantee the ordering within 500 records, while in the same time >> consumer semantic has nothing to change. >> >> Am I making the point clear here? Just want have more discussion on the >> ordering guarantee since I feel it wouldn't be a good idea to break >> consumer ordering guarantee by default. >> >> Best, >> Boyang >> >> >> From: Richard Yu >> Sent: Saturday, December 22, 2018 9:08 AM >> To: dev@kafka.apache.org >> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams >> >> Hi Boyang, >> >> Thanks for pointing out the possibility of skipping bad records (never >> crossed my mind). I suppose we could make it an option for the user if >> they >> could skip a bad record. It was never the intention of this KIP though on >> whether or not to do that. I could log a JIRA on such an issue, but I >> think >> this is out of the KIP's scope. >> >> As for the ordering guarantees, if you are using the standard Kafka design >> of one thread pe
Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, Just changing the title of the KIP. Discovered it wasn't right. Thats about it. :) On Mon, Dec 24, 2018 at 7:57 PM Richard Yu wrote: > Sorry, just making a correction. > > Even if we are processing records out of order, we will still have to > checkpoint offset ranges. > So it doesn't really change anything even if we are doing in-order > processing. > > Thinking this over, I'm leaning slightly towards maintaining the ordering > guarantee. > Although when implementing this change, there might be some kinks that we > have not thought about which could throw a monkey wrench into the works. > > But definitely worth trying out, > Richard > > On Mon, Dec 24, 2018 at 6:51 PM Richard Yu > wrote: > >> Hi Boyang, >> >> I could see where you are going with this. Well, I suppose I should have >> added this to alternatives, but I might as well mention it now. >> >> It had crossed my mind that we consider returning in-order even if there >> are multiple threads processing on the same thread. But for this to happen, >> we must block for the offsets in-between which have not been processed yet. >> For example, offsets 1-50 are being processed by thread1, while the offsets >> 51 - 100 are being processed by thread2. We will have to wait for thread1 >> to finish processing its offsets first before we return the records >> processed by thread2. So in other words, once thread1 is done, thread2's >> work up to that point will be returned in one go, but not before that. >> >> I suppose this could work, but the client will have to wait some time >> before the advantages of multithreaded processing can be seen (i.e. the >> first thread has to finish processing its segment of the records first >> before any others are returned to guarantee ordering). Another point I >> would like to make is that the threads are *asynchronous. *So for us to >> know when a thread is done processing a certain segment, we will probably >> have a similar policy to how getMetadataAsync() works (i.e. have a parent >> thread be notified of when the children threads are done). >> [image: image.png] >> Just pulling this from the KIP. But instead, we would apply this to >> metadata segments instead of just a callback. >> I don't know whether or not the tradeoffs are acceptable to the client. >> Ordering could be guaranteed, but it would be hard to do. For example, if >> there was a crash, we might lose track of which offsets numbers and ranges >> we are processing for each child thread, so somehow we need to find a way >> to checkpoint those as well (like committing them to a Kafka topic). >> >> Let me know your thoughts on this approach. It would work, but the >> implementation details could be a mess. >> >> Cheers, >> Richard >> >> >> >> >> >> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen wrote: >> >>> Hey Richard, >>> >>> thanks for the explanation! After some thinking, I do understand more >>> about this KIP. The motivation was to increase the throughput and put heavy >>> lifting RPC calls or IO operations to the background. While I feel the >>> ordering is hard to guarantee for async task, it is better to be >>> configurable for the end users. >>> >>> An example use case I could think of is: for every 500 records >>> processed, we need an RPC to external storage that takes non-trivial time, >>> and before its finishing all 499 records before it shouldn't be visible to >>> the end user. In such case, we need to have fine-grained control on the >>> visibility of downstream consumer so that our async task is planting a >>> barrier while still make 499 records non-blocking process and send to >>> downstream. So eventually when the heavy RPC is done, we commit this record >>> to remove the barrier and make all 500 records available for downstream. So >>> here we still need to guarantee the ordering within 500 records, while in >>> the same time consumer semantic has nothing to change. >>> >>> Am I making the point clear here? Just want have more discussion on the >>> ordering guarantee since I feel it wouldn't be a good idea to break >>> consumer ordering guarantee by default. >>> >>> Best, >>> Boyang >>> >>> >>> From: Richard Yu >>> Sent: Saturday, December 22, 2018 9:08 AM >>> To: dev@kafka.apache.org >>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams >>> >>
Re: KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, just saying. We are migrating to a different discussion thread. (Forgot that the discussion thread's name was incorrect.) Sorry for the confusion. On Mon, Dec 24, 2018 at 7:57 PM Richard Yu wrote: > Sorry, just making a correction. > > Even if we are processing records out of order, we will still have to > checkpoint offset ranges. > So it doesn't really change anything even if we are doing in-order > processing. > > Thinking this over, I'm leaning slightly towards maintaining the ordering > guarantee. > Although when implementing this change, there might be some kinks that we > have not thought about which could throw a monkey wrench into the works. > > But definitely worth trying out, > Richard > > On Mon, Dec 24, 2018 at 6:51 PM Richard Yu > wrote: > >> Hi Boyang, >> >> I could see where you are going with this. Well, I suppose I should have >> added this to alternatives, but I might as well mention it now. >> >> It had crossed my mind that we consider returning in-order even if there >> are multiple threads processing on the same thread. But for this to happen, >> we must block for the offsets in-between which have not been processed yet. >> For example, offsets 1-50 are being processed by thread1, while the offsets >> 51 - 100 are being processed by thread2. We will have to wait for thread1 >> to finish processing its offsets first before we return the records >> processed by thread2. So in other words, once thread1 is done, thread2's >> work up to that point will be returned in one go, but not before that. >> >> I suppose this could work, but the client will have to wait some time >> before the advantages of multithreaded processing can be seen (i.e. the >> first thread has to finish processing its segment of the records first >> before any others are returned to guarantee ordering). Another point I >> would like to make is that the threads are *asynchronous. *So for us to >> know when a thread is done processing a certain segment, we will probably >> have a similar policy to how getMetadataAsync() works (i.e. have a parent >> thread be notified of when the children threads are done). >> [image: image.png] >> Just pulling this from the KIP. But instead, we would apply this to >> metadata segments instead of just a callback. >> I don't know whether or not the tradeoffs are acceptable to the client. >> Ordering could be guaranteed, but it would be hard to do. For example, if >> there was a crash, we might lose track of which offsets numbers and ranges >> we are processing for each child thread, so somehow we need to find a way >> to checkpoint those as well (like committing them to a Kafka topic). >> >> Let me know your thoughts on this approach. It would work, but the >> implementation details could be a mess. >> >> Cheers, >> Richard >> >> >> >> >> >> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen wrote: >> >>> Hey Richard, >>> >>> thanks for the explanation! After some thinking, I do understand more >>> about this KIP. The motivation was to increase the throughput and put heavy >>> lifting RPC calls or IO operations to the background. While I feel the >>> ordering is hard to guarantee for async task, it is better to be >>> configurable for the end users. >>> >>> An example use case I could think of is: for every 500 records >>> processed, we need an RPC to external storage that takes non-trivial time, >>> and before its finishing all 499 records before it shouldn't be visible to >>> the end user. In such case, we need to have fine-grained control on the >>> visibility of downstream consumer so that our async task is planting a >>> barrier while still make 499 records non-blocking process and send to >>> downstream. So eventually when the heavy RPC is done, we commit this record >>> to remove the barrier and make all 500 records available for downstream. So >>> here we still need to guarantee the ordering within 500 records, while in >>> the same time consumer semantic has nothing to change. >>> >>> Am I making the point clear here? Just want have more discussion on the >>> ordering guarantee since I feel it wouldn't be a good idea to break >>> consumer ordering guarantee by default. >>> >>> Best, >>> Boyang >>> >>> >>> From: Richard Yu >>> Sent: Saturday, December 22, 2018 9:08 AM >>> To: dev@kafka.apache.org >>> Subject: Re: KIP-408: Add As
Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, I made some recent changes to the KIP. It should be more relevant with the issue now (involves Processor API in detail). It would be great if you could comment. Thanks, Richard On Wed, Dec 26, 2018 at 10:01 PM Richard Yu wrote: > Hi all, > > Just changing the title of the KIP. Discovered it wasn't right. > Thats about it. :) > > On Mon, Dec 24, 2018 at 7:57 PM Richard Yu > wrote: > >> Sorry, just making a correction. >> >> Even if we are processing records out of order, we will still have to >> checkpoint offset ranges. >> So it doesn't really change anything even if we are doing in-order >> processing. >> >> Thinking this over, I'm leaning slightly towards maintaining the ordering >> guarantee. >> Although when implementing this change, there might be some kinks that we >> have not thought about which could throw a monkey wrench into the works. >> >> But definitely worth trying out, >> Richard >> >> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu >> wrote: >> >>> Hi Boyang, >>> >>> I could see where you are going with this. Well, I suppose I should have >>> added this to alternatives, but I might as well mention it now. >>> >>> It had crossed my mind that we consider returning in-order even if there >>> are multiple threads processing on the same thread. But for this to happen, >>> we must block for the offsets in-between which have not been processed yet. >>> For example, offsets 1-50 are being processed by thread1, while the offsets >>> 51 - 100 are being processed by thread2. We will have to wait for thread1 >>> to finish processing its offsets first before we return the records >>> processed by thread2. So in other words, once thread1 is done, thread2's >>> work up to that point will be returned in one go, but not before that. >>> >>> I suppose this could work, but the client will have to wait some time >>> before the advantages of multithreaded processing can be seen (i.e. the >>> first thread has to finish processing its segment of the records first >>> before any others are returned to guarantee ordering). Another point I >>> would like to make is that the threads are *asynchronous. *So for us to >>> know when a thread is done processing a certain segment, we will probably >>> have a similar policy to how getMetadataAsync() works (i.e. have a parent >>> thread be notified of when the children threads are done). >>> [image: image.png] >>> Just pulling this from the KIP. But instead, we would apply this to >>> metadata segments instead of just a callback. >>> I don't know whether or not the tradeoffs are acceptable to the client. >>> Ordering could be guaranteed, but it would be hard to do. For example, if >>> there was a crash, we might lose track of which offsets numbers and ranges >>> we are processing for each child thread, so somehow we need to find a way >>> to checkpoint those as well (like committing them to a Kafka topic). >>> >>> Let me know your thoughts on this approach. It would work, but the >>> implementation details could be a mess. >>> >>> Cheers, >>> Richard >>> >>> >>> >>> >>> >>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen wrote: >>> >>>> Hey Richard, >>>> >>>> thanks for the explanation! After some thinking, I do understand more >>>> about this KIP. The motivation was to increase the throughput and put heavy >>>> lifting RPC calls or IO operations to the background. While I feel the >>>> ordering is hard to guarantee for async task, it is better to be >>>> configurable for the end users. >>>> >>>> An example use case I could think of is: for every 500 records >>>> processed, we need an RPC to external storage that takes non-trivial time, >>>> and before its finishing all 499 records before it shouldn't be visible to >>>> the end user. In such case, we need to have fine-grained control on the >>>> visibility of downstream consumer so that our async task is planting a >>>> barrier while still make 499 records non-blocking process and send to >>>> downstream. So eventually when the heavy RPC is done, we commit this record >>>> to remove the barrier and make all 500 records available for downstream. So >>>> here we still need to guarantee the ordering within 500 records, while in >>>> the same time consumer semantic has
Re: [DISCUSS] KIP-262 Metadata should include the number of state stores for task
Hi Matthias, I guess this is no longer necessary. I am open to anything honestly. I suppose we should close it (if its not already). On Fri, Oct 19, 2018 at 11:06 AM Matthias J. Sax wrote: > Any thought on my last email about discarding this KIP? > > > -Matthias > > On 9/14/18 11:44 AM, Matthias J. Sax wrote: > > Hi, > > > > we recently had a discussion on a different ticket to reduce the size of > > the metadata we need to send: > > https://issues.apache.org/jira/browse/KAFKA-7149 > > > > It seems, that we actually don't need to include the number of stores in > > the metadata, but that we can compute the number of stores locally on > > each instance. > > > > With this insight, we should still try to exploit this knowledge during > > task assignment, however, this would be an internal change that does not > > require a KIP. Thus, I think that we can discard this KIP. > > > > Thoughts? > > > > > > -Matthias > > > > On 6/10/18 5:20 PM, Matthias J. Sax wrote: > >> Richard, > >> > >> KIP-268 got merged and thus this KIP is unblocked. > >> > >> I just re-read it and think it needs some updates with regard to the > >> upgrade path (ie, you should mention why upgrading is covered). > >> > >> It would also be useful to discuss how the store information is used > >> during assignment. Atm, the KIP only discussed that the information > >> should be added, but this is only half of the story from my point of > view. > >> > >> > >> -Matthias > >> > >> On 3/22/18 9:15 PM, Matthias J. Sax wrote: > >>> Hi Richard, > >>> > >>> with KIP-268 in place (should be accepted soon) the upgrade path is > >>> covered. Thus, you can update your KIP accordingly, referring to > KIP-268. > >>> > >>> Can you also update your KIP similar to KIP-268 to cover the old and > new > >>> metadata format? > >>> > >>> Thanks! > >>> > >>> -Matthias > >>> > >>> > >>> On 2/24/18 4:07 PM, Richard Yu wrote: > >>>> I didn't really get what "upgrade strategy" was at the time that > Guozhang > >>>> mentioned it, so I wrote the above dialogue from my first > understanding. I > >>>> changed it to "upgrade strategy must be provided". Currently, > however, I do > >>>> not have anything in mind to facilitate upgrading older Kafka > brokers. If > >>>> you have anything in mind, please let me know. > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> On Sat, Feb 24, 2018 at 3:58 PM, Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Thanks a lot for this KIP. > >>>>> > >>>>> I am not sure what you mean by > >>>>> > >>>>>> which could potentially break older versions of Kafka brokers > >>>>> > >>>>> The metadata that is exchange, is not interpreted by the brokers. The > >>>>> problem with upgrading the metadata format affect only Kafka Streams > >>>>> instances. > >>>>> > >>>>> If we don't provide an upgrade strategy, changing the metadata format > >>>>> required to stop all running application instances, before the > instances > >>>>> can be restarted with the new code. However, this implies downtime > for > >>>>> an application and is thus not acceptable. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 2/24/18 11:11 AM, Richard Yu wrote: > >>>>>> Hi all, > >>>>>> > >>>>>> I would like to discuss a KIP I've submitted : > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>> 262%3A+Metadata+should+include+number+of+state+stores+for+task > >>>>>> > >>>>>> > >>>>>> Regards, > >>>>>> Richard Yu > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > > > >
Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, Just bumping this KIP. Would be great if we got some discussion. On Sun, Dec 30, 2018 at 5:13 PM Richard Yu wrote: > Hi all, > > I made some recent changes to the KIP. It should be more relevant with the > issue now (involves Processor API in detail). > It would be great if you could comment. > > Thanks, > Richard > > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu > wrote: > >> Hi all, >> >> Just changing the title of the KIP. Discovered it wasn't right. >> Thats about it. :) >> >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu >> wrote: >> >>> Sorry, just making a correction. >>> >>> Even if we are processing records out of order, we will still have to >>> checkpoint offset ranges. >>> So it doesn't really change anything even if we are doing in-order >>> processing. >>> >>> Thinking this over, I'm leaning slightly towards maintaining the >>> ordering guarantee. >>> Although when implementing this change, there might be some kinks that >>> we have not thought about which could throw a monkey wrench into the works. >>> >>> But definitely worth trying out, >>> Richard >>> >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu >>> wrote: >>> >>>> Hi Boyang, >>>> >>>> I could see where you are going with this. Well, I suppose I should >>>> have added this to alternatives, but I might as well mention it now. >>>> >>>> It had crossed my mind that we consider returning in-order even if >>>> there are multiple threads processing on the same thread. But for this to >>>> happen, we must block for the offsets in-between which have not been >>>> processed yet. For example, offsets 1-50 are being processed by thread1, >>>> while the offsets 51 - 100 are being processed by thread2. We will have to >>>> wait for thread1 to finish processing its offsets first before we return >>>> the records processed by thread2. So in other words, once thread1 is done, >>>> thread2's work up to that point will be returned in one go, but not before >>>> that. >>>> >>>> I suppose this could work, but the client will have to wait some time >>>> before the advantages of multithreaded processing can be seen (i.e. the >>>> first thread has to finish processing its segment of the records first >>>> before any others are returned to guarantee ordering). Another point I >>>> would like to make is that the threads are *asynchronous. *So for us >>>> to know when a thread is done processing a certain segment, we will >>>> probably have a similar policy to how getMetadataAsync() works (i.e. have a >>>> parent thread be notified of when the children threads are done). >>>> [image: image.png] >>>> Just pulling this from the KIP. But instead, we would apply this to >>>> metadata segments instead of just a callback. >>>> I don't know whether or not the tradeoffs are acceptable to the client. >>>> Ordering could be guaranteed, but it would be hard to do. For example, if >>>> there was a crash, we might lose track of which offsets numbers and ranges >>>> we are processing for each child thread, so somehow we need to find a way >>>> to checkpoint those as well (like committing them to a Kafka topic). >>>> >>>> Let me know your thoughts on this approach. It would work, but the >>>> implementation details could be a mess. >>>> >>>> Cheers, >>>> Richard >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen >>>> wrote: >>>> >>>>> Hey Richard, >>>>> >>>>> thanks for the explanation! After some thinking, I do understand more >>>>> about this KIP. The motivation was to increase the throughput and put >>>>> heavy >>>>> lifting RPC calls or IO operations to the background. While I feel the >>>>> ordering is hard to guarantee for async task, it is better to be >>>>> configurable for the end users. >>>>> >>>>> An example use case I could think of is: for every 500 records >>>>> processed, we need an RPC to external storage that takes non-trivial time, >>>>> and before its finishing all 499 records before it shouldn't be visible to >>>>> the end us
Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams
Hi Boyang, Interesting article. Although something crossed my mind. When skipping bad records, we couldn't go back to them to process again to guarantee ordering i.e (both exactly-once and at-least-once would not be supported, only at-most-once). Also, in Kafka, when it comes to individually acking every single record, the resulting latency is horrible (from what I heard). We actually discussed something like this in https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some insight since it is a related issue. I hope this helps, Richard On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen wrote: > Hey Richard, > > thanks for the explanation. Recently I read an interesting blog post< > https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar > (written long time ago), where they define the concept of individual ack > which means we could skip records and leave certain records remain on the > queue for late processing. This should be something similar to KIP-408 > which also shares some motivations for us to invest. > > Boyang > > ____ > From: Richard Yu > Sent: Friday, January 4, 2019 5:42 AM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka > Streams > > Hi all, > > Just bumping this KIP. Would be great if we got some discussion. > > > On Sun, Dec 30, 2018 at 5:13 PM Richard Yu > wrote: > > > Hi all, > > > > I made some recent changes to the KIP. It should be more relevant with > the > > issue now (involves Processor API in detail). > > It would be great if you could comment. > > > > Thanks, > > Richard > > > > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu > > wrote: > > > >> Hi all, > >> > >> Just changing the title of the KIP. Discovered it wasn't right. > >> Thats about it. :) > >> > >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu > >> wrote: > >> > >>> Sorry, just making a correction. > >>> > >>> Even if we are processing records out of order, we will still have to > >>> checkpoint offset ranges. > >>> So it doesn't really change anything even if we are doing in-order > >>> processing. > >>> > >>> Thinking this over, I'm leaning slightly towards maintaining the > >>> ordering guarantee. > >>> Although when implementing this change, there might be some kinks that > >>> we have not thought about which could throw a monkey wrench into the > works. > >>> > >>> But definitely worth trying out, > >>> Richard > >>> > >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu > > >>> wrote: > >>> > >>>> Hi Boyang, > >>>> > >>>> I could see where you are going with this. Well, I suppose I should > >>>> have added this to alternatives, but I might as well mention it now. > >>>> > >>>> It had crossed my mind that we consider returning in-order even if > >>>> there are multiple threads processing on the same thread. But for > this to > >>>> happen, we must block for the offsets in-between which have not been > >>>> processed yet. For example, offsets 1-50 are being processed by > thread1, > >>>> while the offsets 51 - 100 are being processed by thread2. We will > have to > >>>> wait for thread1 to finish processing its offsets first before we > return > >>>> the records processed by thread2. So in other words, once thread1 is > done, > >>>> thread2's work up to that point will be returned in one go, but not > before > >>>> that. > >>>> > >>>> I suppose this could work, but the client will have to wait some time > >>>> before the advantages of multithreaded processing can be seen (i.e. > the > >>>> first thread has to finish processing its segment of the records first > >>>> before any others are returned to guarantee ordering). Another point I > >>>> would like to make is that the threads are *asynchronous. *So for us > >>>> to know when a thread is done processing a certain segment, we will > >>>> probably have a similar policy to how getMetadataAsync() works (i.e. > have a > >>>> parent thread be notified of when the children threads are done). > >>>> [image: image.png] > >>>> Just pulling this from the KIP. But instead, we would apply this to > >>>
Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams
Hi all, Just want to hear some opinions on this KIP from the PMCs. It would be nice if we got input from them. Don't want to drag this KIP for too long! :) Hope we get some input :) Thanks, Richard On Thu, Jan 3, 2019 at 8:26 PM Richard Yu wrote: > Hi Boyang, > > Interesting article. Although something crossed my mind. When skipping bad > records, we couldn't go back to them to process again to guarantee ordering > i.e (both exactly-once and at-least-once would not be supported, only > at-most-once). Also, in Kafka, when it comes to individually acking every > single record, the resulting latency is horrible (from what I heard). We > actually discussed something like this in > https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some > insight since it is a related issue. > > I hope this helps, > Richard > > > > > On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen wrote: > >> Hey Richard, >> >> thanks for the explanation. Recently I read an interesting blog post< >> https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar >> (written long time ago), where they define the concept of individual ack >> which means we could skip records and leave certain records remain on the >> queue for late processing. This should be something similar to KIP-408 >> which also shares some motivations for us to invest. >> >> Boyang >> >> >> From: Richard Yu >> Sent: Friday, January 4, 2019 5:42 AM >> To: dev@kafka.apache.org >> Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka >> Streams >> >> Hi all, >> >> Just bumping this KIP. Would be great if we got some discussion. >> >> >> On Sun, Dec 30, 2018 at 5:13 PM Richard Yu >> wrote: >> >> > Hi all, >> > >> > I made some recent changes to the KIP. It should be more relevant with >> the >> > issue now (involves Processor API in detail). >> > It would be great if you could comment. >> > >> > Thanks, >> > Richard >> > >> > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu > > >> > wrote: >> > >> >> Hi all, >> >> >> >> Just changing the title of the KIP. Discovered it wasn't right. >> >> Thats about it. :) >> >> >> >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu > > >> >> wrote: >> >> >> >>> Sorry, just making a correction. >> >>> >> >>> Even if we are processing records out of order, we will still have to >> >>> checkpoint offset ranges. >> >>> So it doesn't really change anything even if we are doing in-order >> >>> processing. >> >>> >> >>> Thinking this over, I'm leaning slightly towards maintaining the >> >>> ordering guarantee. >> >>> Although when implementing this change, there might be some kinks that >> >>> we have not thought about which could throw a monkey wrench into the >> works. >> >>> >> >>> But definitely worth trying out, >> >>> Richard >> >>> >> >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu < >> yohan.richard...@gmail.com> >> >>> wrote: >> >>> >> >>>> Hi Boyang, >> >>>> >> >>>> I could see where you are going with this. Well, I suppose I should >> >>>> have added this to alternatives, but I might as well mention it now. >> >>>> >> >>>> It had crossed my mind that we consider returning in-order even if >> >>>> there are multiple threads processing on the same thread. But for >> this to >> >>>> happen, we must block for the offsets in-between which have not been >> >>>> processed yet. For example, offsets 1-50 are being processed by >> thread1, >> >>>> while the offsets 51 - 100 are being processed by thread2. We will >> have to >> >>>> wait for thread1 to finish processing its offsets first before we >> return >> >>>> the records processed by thread2. So in other words, once thread1 is >> done, >> >>>> thread2's work up to that point will be returned in one go, but not >> before >> >>>> that. >> >>>> >> >>>> I suppose this could work, but the client will have to wait some time >> >>>> before the advantages
[VOTE] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi all, It appears that discussion is coming to a close for KIP-266. I would like to start a voting thread for this KIP. Here is the link for reference. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 Thanks, Richard
Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position
Hi all, I would like to bump this thread since discussion in the KIP appears to be reaching its conclusion. On Thu, Mar 15, 2018 at 3:30 PM, Richard Yu wrote: > Hi all, > > Since there does not seem to be too much discussion in KIP-266, I will be > starting a voting thread. > Here is the link to KIP-266 for reference: > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 > > Recently, I have made some updates to the KIP. To reiterate, I have > included KafkaConsumer's commitSync, > poll, and committed in the KIP. (we will be adding to a TimeoutException > to them as well, in a similar manner > to what we will be doing for position()) > > Thanks, > Richard Yu > >
Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position
Hi, With 3 binding votes and 6 non-binding, this KIP would be accepted. Thanks for participating. On Thu, May 10, 2018 at 2:35 AM, Edoardo Comar wrote: > +1 (non-binding) > > On 10 May 2018 at 10:29, zhenya Sun wrote: > > > +1 non-binding > > > > > 在 2018年5月10日,下午5:19,Manikumar 写道: > > > > > > +1 (non-binding). > > > Thanks. > > > > > > On Thu, May 10, 2018 at 2:33 PM, Mickael Maison < > > mickael.mai...@gmail.com> > > > wrote: > > > > > >> +1 (non binding) > > >> Thanks > > >> > > >> On Thu, May 10, 2018 at 9:39 AM, Rajini Sivaram < > > rajinisiva...@gmail.com> > > >> wrote: > > >>> Hi Richard, Thanks for the KIP. > > >>> > > >>> +1 (binding) > > >>> > > >>> Regards, > > >>> > > >>> Rajini > > >>> > > >>> On Wed, May 9, 2018 at 10:54 PM, Guozhang Wang > > >> wrote: > > >>> > > >>>> +1 from me, thanks! > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> On Wed, May 9, 2018 at 10:46 AM, Jason Gustafson < > ja...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Thanks for the KIP, +1 (binding). > > >>>>> > > >>>>> One small correction: the KIP mentions that close() will be > > >> deprecated, > > >>>> but > > >>>>> we do not want to do this because it is needed by the Closeable > > >>>> interface. > > >>>>> We only want to deprecate close(long, TimeUnit) in favor of > > >>>>> close(Duration). > > >>>>> > > >>>>> -Jason > > >>>>> > > >>>>> On Tue, May 8, 2018 at 12:43 AM, khaireddine Rezgui < > > >>>>> khaireddine...@gmail.com> wrote: > > >>>>> > > >>>>>> +1 > > >>>>>> > > >>>>>> 2018-05-07 20:35 GMT+01:00 Bill Bejeck : > > >>>>>> > > >>>>>>> +1 > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Bill > > >>>>>>> > > >>>>>>> On Fri, May 4, 2018 at 7:21 PM, Richard Yu < > > >>>> yohan.richard...@gmail.com > > >>>>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi all, I would like to bump this thread since discussion in the > > >>>> KIP > > >>>>>>>> appears to be reaching its conclusion. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Thu, Mar 15, 2018 at 3:30 PM, Richard Yu < > > >>>>>> yohan.richard...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi all, > > >>>>>>>>> > > >>>>>>>>> Since there does not seem to be too much discussion in > > >> KIP-266, I > > >>>>>> will > > >>>>>>> be > > >>>>>>>>> starting a voting thread. > > >>>>>>>>> Here is the link to KIP-266 for reference: > > >>>>>>>>> > > >>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage. > > >>>>>>>> action?pageId=75974886 > > >>>>>>>>> > > >>>>>>>>> Recently, I have made some updates to the KIP. To reiterate, I > > >>>> have > > >>>>>>>>> included KafkaConsumer's commitSync, > > >>>>>>>>> poll, and committed in the KIP. (we will be adding to a > > >>>>>>> TimeoutException > > >>>>>>>>> to them as well, in a similar manner > > >>>>>>>>> to what we will be doing for position()) > > >>>>>>>>> > > >>>>>>>>> Thanks, > > >>>>>>>>> Richard Yu > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> Ingénieur en informatique > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >> > > > > > > > -- > "When the people fear their government, there is tyranny; when the > government fears the people, there is liberty." [Thomas Jefferson] >
[DISCUSS] KIP-333 Consider adding faster form of rebalancing
Hi all, I would like to discuss the KIP that I had written recently. Here is the link:KIP-333: Add faster mode of rebalancing - Apache Kafka - Apache Software Foundation | | | | | | | | | | | KIP-333: Add faster mode of rebalancing - Apache Kafka - Apache Software... | | | Thanks,Richard Yu
[DISCUSS] KIP-333 Consider a faster form of rebalancing
Hi all, I would like to discuss KIP-333 (which proposes a faster mode of rebalancing). Here is the link for the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing Thanks, Richard Yu
[DISCUSS] KIP-333 Add faster mode of rebalancing.
Hi all, I would like to discuss the KIP that I had written recently. Here is the link:KIP-333: Add faster mode of rebalancing - Apache Kafka - Apache Software Foundation | | | | | | | | | | | KIP-333: Add faster mode of rebalancing - Apache Kafka - Apache Software... | | | Thanks,Richard Yu
Re: [VOTE] KIP-331 Add default implementation to close() and configure() for Serializer, Deserializer and Serde
Nice KIP! +1 (non-binding) -Richard On Friday, July 6, 2018, 9:10:43 AM GMT+8, Matthias J. Sax wrote: Thanks for the KIP! +1 (binding) -Matthias On 7/5/18 7:45 AM, Chia-Ping Tsai wrote: > hi all, > > I would like to start voting on "KIP-331 Add default implementation to > close() and configure() for Serializer, Deserializer and Serde" > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde > > Cheers, > Chia-Ping >
[DISCUSS] KIP-335 Consider configurations for Kafka Streams
Hi all, Eversince KIP-266 was concluded, there has been a pressing need to migrate Kafka Streams as well. For the link, please click here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-335%3A+Consider+configurations+for+KafkaStreams Thanks, Richard Yu
Re: [DISCUSS] KIP-335 Consider configurations for Kafka Streams
Hi Matthias, It would be nice to get your opinions on this. On Monday, July 9, 2018, 12:17:33 PM GMT+8, Richard Yu wrote: Hi all, Eversince KIP-266 was concluded, there has been a pressing need to migrate Kafka Streams as well. For the link, please click here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-335%3A+Consider+configurations+for+KafkaStreams Thanks, Richard Yu
Re: [DISCUSS] KIP-333 Consider a faster form of rebalancing
Hi Becket, Thanks for reviewing this KIP. :) I probably did not explicitly state what we were trying to avoid by introducing this mode. As mentioned in the KIP, there is a offset lag which could result after a crash. Our main goal is to avoid this lag (i.e. the latency in terms of time that results from the crash, not to reduce the number of records reprocessed). I could provide a couple of diagrams with what I am envisioning because some points in my KIP might otherwise be hard to grasp (I will also include some diagrams to give you a better idea of an use case). As for your questions, I could provide a couple of answers: 1. Yes, the two consumers will in fact be processing in parallel. We do this because we want to accelerate the processing speed of the records to make up for the latency caused by the crash. 2. After the recovery point, records will not be processed twice. Let me describe the scenario I was envisioning: we would let the consumer that crashed seek to the end of the log using KafkaConsumer#seekToEnd. Meanwhile, a secondary consumer will start processing from the latest checkpointed offset and continue until it has hit the place where the first consumer that crashed began processing after seekToEnd was first called. Since the consumer that crashed skipped from the recovery point to the end of the log, the intermediate offsets will be processed only by the secondary consumer. So it is important to note that the offset ranges which the two threads process will not overlap. (This is important as it prevents offsets from being processed more than once) 3. As for the committed offsets, the possibility of rewinding is not likely. If my understanding is correct, you are probably worried that after the crash, offsets that has already been previously committed will be committed again. The current design prevents that from happening, as the policy of where to start processing after a crash is universal across all Consumer instances -- we will begin processing from the latest offset committed. I hope that you at least got some of your questions answered. I will update the KIP soon, so please stay tuned. Thanks,Richard Yu On Tuesday, July 17, 2018, 2:14:07 PM GMT+8, Becket Qin wrote: Hi Richard, Thanks for the KIP. I am a little confused on what is proposed. The KIP suggests that after recovery from a consumer crash, there will be two consumers consuming from the same partition. One consumes starting from the log end offset at the point of recovery, and another consumes starting from the last committed offset and keeping consuming with the first consumer in parallel? Does that mean the messages after the recovery point will be consumed twice? If those two consumer commits offsets, does that mean the committed offsets may rewind? The proposal sounds a little hacky and introduce some non-deterministic behavior. It would be useful to have a concrete use case example to explain what is actually needed. If the goal is to reduce the number of records that are reprocessed when consume crashes, maybe we can have an auto commit interval based on number of messages. If the application just wants to read from the end of the log after recovery from crash, would calling seekToEnd explicitly work? Thanks, Jiangjie (Becket) Qin On Thu, Jul 5, 2018 at 6:46 PM, Richard Yu wrote: > Hi all, > > I would like to discuss KIP-333 (which proposes a faster mode of > rebalancing). > Here is the link for the KIP: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 333%3A+Add+faster+mode+of+rebalancing > > Thanks, > Richard Yu >
Re: [DISCUSS] KIP-333 Consider a faster form of rebalancing
Hi Becket, I made some changes and clarified the motivation for this KIP. :)It should be easier to understand now since I included a diagram. Thanks,Richard Yu On Tuesday, July 17, 2018, 4:38:11 PM GMT+8, Richard Yu wrote: Hi Becket, Thanks for reviewing this KIP. :) I probably did not explicitly state what we were trying to avoid by introducing this mode. As mentioned in the KIP, there is a offset lag which could result after a crash. Our main goal is to avoid this lag (i.e. the latency in terms of time that results from the crash, not to reduce the number of records reprocessed). I could provide a couple of diagrams with what I am envisioning because some points in my KIP might otherwise be hard to grasp (I will also include some diagrams to give you a better idea of an use case). As for your questions, I could provide a couple of answers: 1. Yes, the two consumers will in fact be processing in parallel. We do this because we want to accelerate the processing speed of the records to make up for the latency caused by the crash. 2. After the recovery point, records will not be processed twice. Let me describe the scenario I was envisioning: we would let the consumer that crashed seek to the end of the log using KafkaConsumer#seekToEnd. Meanwhile, a secondary consumer will start processing from the latest checkpointed offset and continue until it has hit the place where the first consumer that crashed began processing after seekToEnd was first called. Since the consumer that crashed skipped from the recovery point to the end of the log, the intermediate offsets will be processed only by the secondary consumer. So it is important to note that the offset ranges which the two threads process will not overlap. (This is important as it prevents offsets from being processed more than once) 3. As for the committed offsets, the possibility of rewinding is not likely. If my understanding is correct, you are probably worried that after the crash, offsets that has already been previously committed will be committed again. The current design prevents that from happening, as the policy of where to start processing after a crash is universal across all Consumer instances -- we will begin processing from the latest offset committed. I hope that you at least got some of your questions answered. I will update the KIP soon, so please stay tuned. Thanks,Richard Yu On Tuesday, July 17, 2018, 2:14:07 PM GMT+8, Becket Qin wrote: Hi Richard, Thanks for the KIP. I am a little confused on what is proposed. The KIP suggests that after recovery from a consumer crash, there will be two consumers consuming from the same partition. One consumes starting from the log end offset at the point of recovery, and another consumes starting from the last committed offset and keeping consuming with the first consumer in parallel? Does that mean the messages after the recovery point will be consumed twice? If those two consumer commits offsets, does that mean the committed offsets may rewind? The proposal sounds a little hacky and introduce some non-deterministic behavior. It would be useful to have a concrete use case example to explain what is actually needed. If the goal is to reduce the number of records that are reprocessed when consume crashes, maybe we can have an auto commit interval based on number of messages. If the application just wants to read from the end of the log after recovery from crash, would calling seekToEnd explicitly work? Thanks, Jiangjie (Becket) Qin On Thu, Jul 5, 2018 at 6:46 PM, Richard Yu wrote: > Hi all, > > I would like to discuss KIP-333 (which proposes a faster mode of > rebalancing). > Here is the link for the KIP: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 333%3A+Add+faster+mode+of+rebalancing > > Thanks, > Richard Yu >
[DISCUSS] KIP-262 Metadata should include the number of state stores for task
Hi all, I would like to discuss a KIP I've submitted : https://cwiki.apache.org/confluence/display/KAFKA/KIP-262%3A+Metadata+should+include+number+of+state+stores+for+task Regards, Richard Yu
Re: [DISCUSS] KIP-262 Metadata should include the number of state stores for task
I didn't really get what "upgrade strategy" was at the time that Guozhang mentioned it, so I wrote the above dialogue from my first understanding. I changed it to "upgrade strategy must be provided". Currently, however, I do not have anything in mind to facilitate upgrading older Kafka brokers. If you have anything in mind, please let me know. On Sat, Feb 24, 2018 at 3:58 PM, Matthias J. Sax wrote: > Thanks a lot for this KIP. > > I am not sure what you mean by > > > which could potentially break older versions of Kafka brokers > > The metadata that is exchange, is not interpreted by the brokers. The > problem with upgrading the metadata format affect only Kafka Streams > instances. > > If we don't provide an upgrade strategy, changing the metadata format > required to stop all running application instances, before the instances > can be restarted with the new code. However, this implies downtime for > an application and is thus not acceptable. > > > -Matthias > > > On 2/24/18 11:11 AM, Richard Yu wrote: > > Hi all, > > > > I would like to discuss a KIP I've submitted : > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 262%3A+Metadata+should+include+number+of+state+stores+for+task > > > > > > Regards, > > Richard Yu > > > >
[DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi all, I would like to discuss a potential change which would be made to KafkaConsumer: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 Thanks, Richard Yu
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi all, I updated the KIP where overloading position() is now the favored approach. Bounding position() using requestTimeoutMs has been listed as rejected. Any thoughts? On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang wrote: > I agree that adding the overloads is most flexible. But going for that > direction we'd do that for all the blocking call that I've listed above, > with this timeout value covering the end-to-end waiting time. > > > Guozhang > > On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu wrote: > > > bq. The most flexible option is to add overloads to the consumer > > > > This option is flexible. > > > > Looking at the tail of SPARK-18057, Spark dev voiced the same choice. > > > > +1 for adding overload with timeout parameter. > > > > Cheers > > > > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson > > wrote: > > > > > @Guozhang I probably have suggested all options at some point or > another, > > > including most recently, the current KIP! I was thinking that > practically > > > speaking, the request timeout defines how long the user is willing to > > wait > > > for a response. The consumer doesn't really have a complex send process > > > like the producer for any of these APIs, so I wasn't sure how much > > benefit > > > there would be from having more granular control over timeouts (in the > > end, > > > KIP-91 just adds a single timeout to control the whole send). That > said, > > it > > > might indeed be better to avoid overloading the config as you suggest > > since > > > at least it avoids inconsistency with the producer's usage. > > > > > > The most flexible option is to add overloads to the consumer so that > > users > > > can pass the timeout directly. I'm not sure if that is more or less > > > annoying than a new config, but I've found config timeouts a little > > > constraining in practice. For example, I could imagine users wanting to > > > wait longer for an offset commit operation than a position lookup; if > the > > > latter isn't timely, users can just pause the partition and continue > > > fetching on others. If you cannot commit offsets, however, it might be > > > safer for an application to wait availability of the coordinator than > > > continuing. > > > > > > -Jason > > > > > > On Sun, Mar 4, 2018 at 10:14 PM, Guozhang Wang > > wrote: > > > > > > > Hello Richard, > > > > > > > > Thanks for the proposed KIP. I have a couple of general comments: > > > > > > > > 1. I'm not sure if piggy-backing the timeout exception on the > > > > existing requestTimeoutMs configured in "request.timeout.ms" is a > good > > > > idea > > > > since a) it is a general config that applies for all types of > requests, > > > and > > > > 2) using it to cover all the phases of an API call, including network > > > round > > > > trip and potential metadata refresh is shown to not be a good idea, > as > > > > illustrated in KIP-91: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer > > > > > > > > In fact, I think in KAFKA-4879 which is aimed for the same issue as > > > > KAFKA-6608, > > > > Jason has suggested we use a new config for the API. Maybe this would > > be > > > a > > > > more intuitive manner than reusing the request.timeout.ms config. > > > > > > > > > > > > 2. Besides the Consumer.position() call, there are a couple of more > > > > blocking calls today that could result in infinite blocking: > > > > Consumer.commitSync() and Consumer.committed(), should they be > > considered > > > > in this KIP as well? > > > > > > > > 3. There are a few other APIs that are today relying on > > > request.timeout.ms > > > > already for breaking the infinite blocking, namely > > > Consumer.partitionFor(), > > > > Consumer.OffsetAndTimestamp() and Consumer.listTopics(), if we are > > making > > > > the other blocking calls to be relying a new config as suggested in > 1) > > > > above, should we also change the semantics of these API functions for > > > > consistency? > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Sun, Mar 4, 2018 at 11:13 AM, Richard Yu < > > yohan.richard...@gmail.com> > > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I would like to discuss a potential change which would be made to > > > > > KafkaConsumer: > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > > > action?pageId=75974886 > > > > > > > > > > Thanks, > > > > > Richard Yu > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > -- Guozhang >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Note to all: I have included bounding commitSync() and committed() in this KIP. On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu wrote: > Hi all, > > I updated the KIP where overloading position() is now the favored approach. > Bounding position() using requestTimeoutMs has been listed as rejected. > > Any thoughts? > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang wrote: > >> I agree that adding the overloads is most flexible. But going for that >> direction we'd do that for all the blocking call that I've listed above, >> with this timeout value covering the end-to-end waiting time. >> >> >> Guozhang >> >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu wrote: >> >> > bq. The most flexible option is to add overloads to the consumer >> > >> > This option is flexible. >> > >> > Looking at the tail of SPARK-18057, Spark dev voiced the same choice. >> > >> > +1 for adding overload with timeout parameter. >> > >> > Cheers >> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson >> > wrote: >> > >> > > @Guozhang I probably have suggested all options at some point or >> another, >> > > including most recently, the current KIP! I was thinking that >> practically >> > > speaking, the request timeout defines how long the user is willing to >> > wait >> > > for a response. The consumer doesn't really have a complex send >> process >> > > like the producer for any of these APIs, so I wasn't sure how much >> > benefit >> > > there would be from having more granular control over timeouts (in the >> > end, >> > > KIP-91 just adds a single timeout to control the whole send). That >> said, >> > it >> > > might indeed be better to avoid overloading the config as you suggest >> > since >> > > at least it avoids inconsistency with the producer's usage. >> > > >> > > The most flexible option is to add overloads to the consumer so that >> > users >> > > can pass the timeout directly. I'm not sure if that is more or less >> > > annoying than a new config, but I've found config timeouts a little >> > > constraining in practice. For example, I could imagine users wanting >> to >> > > wait longer for an offset commit operation than a position lookup; if >> the >> > > latter isn't timely, users can just pause the partition and continue >> > > fetching on others. If you cannot commit offsets, however, it might be >> > > safer for an application to wait availability of the coordinator than >> > > continuing. >> > > >> > > -Jason >> > > >> > > On Sun, Mar 4, 2018 at 10:14 PM, Guozhang Wang >> > wrote: >> > > >> > > > Hello Richard, >> > > > >> > > > Thanks for the proposed KIP. I have a couple of general comments: >> > > > >> > > > 1. I'm not sure if piggy-backing the timeout exception on the >> > > > existing requestTimeoutMs configured in "request.timeout.ms" is a >> good >> > > > idea >> > > > since a) it is a general config that applies for all types of >> requests, >> > > and >> > > > 2) using it to cover all the phases of an API call, including >> network >> > > round >> > > > trip and potential metadata refresh is shown to not be a good idea, >> as >> > > > illustrated in KIP-91: >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer >> > > > >> > > > In fact, I think in KAFKA-4879 which is aimed for the same issue as >> > > > KAFKA-6608, >> > > > Jason has suggested we use a new config for the API. Maybe this >> would >> > be >> > > a >> > > > more intuitive manner than reusing the request.timeout.ms config. >> > > > >> > > > >> > > > 2. Besides the Consumer.position() call, there are a couple of more >> > > > blocking calls today that could result in infinite blocking: >> > > > Consumer.commitSync() and Consumer.committed(), should they be >> > considered >> > > > in this KIP as well? >> > > > >> > > > 3. There are a few other APIs that are today relying on >> > > request.timeout.ms >> > > > already for breaking the infinite blocking, namely >> > > Consumer.partitionFor(), >> > > > Consumer.OffsetAndTimestamp() and Consumer.listTopics(), if we are >> > making >> > > > the other blocking calls to be relying a new config as suggested in >> 1) >> > > > above, should we also change the semantics of these API functions >> for >> > > > consistency? >> > > > >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > >> > > > >> > > > On Sun, Mar 4, 2018 at 11:13 AM, Richard Yu < >> > yohan.richard...@gmail.com> >> > > > wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > I would like to discuss a potential change which would be made to >> > > > > KafkaConsumer: >> > > > > https://cwiki.apache.org/confluence/pages/viewpage. >> > > > action?pageId=75974886 >> > > > > >> > > > > Thanks, >> > > > > Richard Yu >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >
[VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position
Hi all, Since there does not seem to be too much discussion in KIP-266, I will be starting a voting thread. Here is the link to KIP-266 for reference: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 Recently, I have made some updates to the KIP. To reiterate, I have included KafkaConsumer's commitSync, poll, and committed in the KIP. (we will be adding to a TimeoutException to them as well, in a similar manner to what we will be doing for position()) Thanks, Richard Yu
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Thanks for the advice, Jason I have modified KIP-266 to include the java doc for committed() and other blocking methods, and I also mentioned poll() which will also be bounded. Let me know if there is anything else. :) Sincerely, Richard On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson wrote: > Hi Richard, > > Thanks for the updates. I'm really glad you picked this up. A couple minor > comments: > > 1. Can you list the full set of new APIs explicitly in the KIP? Currently I > only see the javadoc for `position()`. > > 2. We should consider adding `TimeUnit` to the new methods to avoid unit > confusion. I know it's inconsistent with the poll() API, but I think it was > probably a mistake not to include it there, so better not to double down on > that mistake. And note that we do already have `close(long, TimeUnit)`. > > Other than that, I think the current KIP seems reasonable. > > Thanks, > Jason > > On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu > wrote: > > > Note to all: I have included bounding commitSync() and committed() in > this > > KIP. > > > > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu > > wrote: > > > > > Hi all, > > > > > > I updated the KIP where overloading position() is now the favored > > approach. > > > Bounding position() using requestTimeoutMs has been listed as rejected. > > > > > > Any thoughts? > > > > > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang > > wrote: > > > > > >> I agree that adding the overloads is most flexible. But going for that > > >> direction we'd do that for all the blocking call that I've listed > above, > > >> with this timeout value covering the end-to-end waiting time. > > >> > > >> > > >> Guozhang > > >> > > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu wrote: > > >> > > >> > bq. The most flexible option is to add overloads to the consumer > > >> > > > >> > This option is flexible. > > >> > > > >> > Looking at the tail of SPARK-18057, Spark dev voiced the same > choice. > > >> > > > >> > +1 for adding overload with timeout parameter. > > >> > > > >> > Cheers > > >> > > > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson > > > >> > wrote: > > >> > > > >> > > @Guozhang I probably have suggested all options at some point or > > >> another, > > >> > > including most recently, the current KIP! I was thinking that > > >> practically > > >> > > speaking, the request timeout defines how long the user is willing > > to > > >> > wait > > >> > > for a response. The consumer doesn't really have a complex send > > >> process > > >> > > like the producer for any of these APIs, so I wasn't sure how much > > >> > benefit > > >> > > there would be from having more granular control over timeouts (in > > the > > >> > end, > > >> > > KIP-91 just adds a single timeout to control the whole send). That > > >> said, > > >> > it > > >> > > might indeed be better to avoid overloading the config as you > > suggest > > >> > since > > >> > > at least it avoids inconsistency with the producer's usage. > > >> > > > > >> > > The most flexible option is to add overloads to the consumer so > that > > >> > users > > >> > > can pass the timeout directly. I'm not sure if that is more or > less > > >> > > annoying than a new config, but I've found config timeouts a > little > > >> > > constraining in practice. For example, I could imagine users > wanting > > >> to > > >> > > wait longer for an offset commit operation than a position lookup; > > if > > >> the > > >> > > latter isn't timely, users can just pause the partition and > continue > > >> > > fetching on others. If you cannot commit offsets, however, it > might > > be > > >> > > safer for an application to wait availability of the coordinator > > than > > >> > > continuing. > > >> > > > > >> > > -Jason > > >> > > > > >> > > On Sun, Mar 4, 2018 at 10:14 PM, Guozha
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
One more thing: when looking through tests, I have realized that seek() methods can potentially block indefinitely. As you well know, seek() is called when pollOnce() or position() is active. Thus, if position() blocks indefinitely, then so would seek(). Should bounding seek() also be included in this KIP? Thanks, Richard On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu wrote: > Thanks for the advice, Jason > > I have modified KIP-266 to include the java doc for committed() and other > blocking methods, and I also > mentioned poll() which will also be bounded. Let me know if there is > anything else. :) > > Sincerely, Richard > > > > > > On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson > wrote: > >> Hi Richard, >> >> Thanks for the updates. I'm really glad you picked this up. A couple minor >> comments: >> >> 1. Can you list the full set of new APIs explicitly in the KIP? Currently >> I >> only see the javadoc for `position()`. >> >> 2. We should consider adding `TimeUnit` to the new methods to avoid unit >> confusion. I know it's inconsistent with the poll() API, but I think it >> was >> probably a mistake not to include it there, so better not to double down >> on >> that mistake. And note that we do already have `close(long, TimeUnit)`. >> >> Other than that, I think the current KIP seems reasonable. >> >> Thanks, >> Jason >> >> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu >> wrote: >> >> > Note to all: I have included bounding commitSync() and committed() in >> this >> > KIP. >> > >> > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu > > >> > wrote: >> > >> > > Hi all, >> > > >> > > I updated the KIP where overloading position() is now the favored >> > approach. >> > > Bounding position() using requestTimeoutMs has been listed as >> rejected. >> > > >> > > Any thoughts? >> > > >> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang >> > wrote: >> > > >> > >> I agree that adding the overloads is most flexible. But going for >> that >> > >> direction we'd do that for all the blocking call that I've listed >> above, >> > >> with this timeout value covering the end-to-end waiting time. >> > >> >> > >> >> > >> Guozhang >> > >> >> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu wrote: >> > >> >> > >> > bq. The most flexible option is to add overloads to the consumer >> > >> > >> > >> > This option is flexible. >> > >> > >> > >> > Looking at the tail of SPARK-18057, Spark dev voiced the same >> choice. >> > >> > >> > >> > +1 for adding overload with timeout parameter. >> > >> > >> > >> > Cheers >> > >> > >> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson < >> ja...@confluent.io> >> > >> > wrote: >> > >> > >> > >> > > @Guozhang I probably have suggested all options at some point or >> > >> another, >> > >> > > including most recently, the current KIP! I was thinking that >> > >> practically >> > >> > > speaking, the request timeout defines how long the user is >> willing >> > to >> > >> > wait >> > >> > > for a response. The consumer doesn't really have a complex send >> > >> process >> > >> > > like the producer for any of these APIs, so I wasn't sure how >> much >> > >> > benefit >> > >> > > there would be from having more granular control over timeouts >> (in >> > the >> > >> > end, >> > >> > > KIP-91 just adds a single timeout to control the whole send). >> That >> > >> said, >> > >> > it >> > >> > > might indeed be better to avoid overloading the config as you >> > suggest >> > >> > since >> > >> > > at least it avoids inconsistency with the producer's usage. >> > >> > > >> > >> > > The most flexible option is to add overloads to the consumer so >> that >> > >> > users >> > >> > > can pass the timeout directly. I'm no
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Actually, what I said above is inaccurate. In testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, not seek. My assumption is that seek did not update correctly. I will be digging further into this. On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu wrote: > One more thing: when looking through tests, I have realized that seek() > methods can potentially block indefinitely. As you well know, seek() is > called when pollOnce() or position() is active. Thus, if position() blocks > indefinitely, then so would seek(). Should bounding seek() also be included > in this KIP? > > Thanks, Richard > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu > wrote: > >> Thanks for the advice, Jason >> >> I have modified KIP-266 to include the java doc for committed() and other >> blocking methods, and I also >> mentioned poll() which will also be bounded. Let me know if there is >> anything else. :) >> >> Sincerely, Richard >> >> >> >> >> >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson >> wrote: >> >>> Hi Richard, >>> >>> Thanks for the updates. I'm really glad you picked this up. A couple >>> minor >>> comments: >>> >>> 1. Can you list the full set of new APIs explicitly in the KIP? >>> Currently I >>> only see the javadoc for `position()`. >>> >>> 2. We should consider adding `TimeUnit` to the new methods to avoid unit >>> confusion. I know it's inconsistent with the poll() API, but I think it >>> was >>> probably a mistake not to include it there, so better not to double down >>> on >>> that mistake. And note that we do already have `close(long, TimeUnit)`. >>> >>> Other than that, I think the current KIP seems reasonable. >>> >>> Thanks, >>> Jason >>> >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu >>> wrote: >>> >>> > Note to all: I have included bounding commitSync() and committed() in >>> this >>> > KIP. >>> > >>> > On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu < >>> yohan.richard...@gmail.com> >>> > wrote: >>> > >>> > > Hi all, >>> > > >>> > > I updated the KIP where overloading position() is now the favored >>> > approach. >>> > > Bounding position() using requestTimeoutMs has been listed as >>> rejected. >>> > > >>> > > Any thoughts? >>> > > >>> > > On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang >>> > wrote: >>> > > >>> > >> I agree that adding the overloads is most flexible. But going for >>> that >>> > >> direction we'd do that for all the blocking call that I've listed >>> above, >>> > >> with this timeout value covering the end-to-end waiting time. >>> > >> >>> > >> >>> > >> Guozhang >>> > >> >>> > >> On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu >>> wrote: >>> > >> >>> > >> > bq. The most flexible option is to add overloads to the consumer >>> > >> > >>> > >> > This option is flexible. >>> > >> > >>> > >> > Looking at the tail of SPARK-18057, Spark dev voiced the same >>> choice. >>> > >> > >>> > >> > +1 for adding overload with timeout parameter. >>> > >> > >>> > >> > Cheers >>> > >> > >>> > >> > On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson < >>> ja...@confluent.io> >>> > >> > wrote: >>> > >> > >>> > >> > > @Guozhang I probably have suggested all options at some point or >>> > >> another, >>> > >> > > including most recently, the current KIP! I was thinking that >>> > >> practically >>> > >> > > speaking, the request timeout defines how long the user is >>> willing >>> > to >>> > >> > wait >>> > >> > > for a response. The consumer doesn't really have a complex send >>> > >> process >>> > >> > > like the producer for any of these APIs, so I wasn't sure how >>> much >>> > >> > benefit >>> > >> > > there would be from
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi Guozhang, I made some clarifications to KIP-266, namely: 1. Stated more specifically that commitSync will accept user input. 2. fetchCommittedOffsets(): Made its role in blocking more clear to the reader. 3. Sketched what would happen when time limit is exceeded. These changes should make the KIP easier to understand. Cheers, Richard On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang wrote: > Hi Richard, > > I made a pass over the KIP again, some more clarifications / comments: > > 1. seek() call itself is not blocking, only the following poll() call may > be blocking as the actually metadata rq will happen. > > 2. I saw you did not include Consumer.partitionFor(), > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. After > a second thought, I think this may be a better idea to not tackle them in > the same KIP, and probably we should consider whether we would change the > behavior or not in another discussion. So I agree to not include them. > > 3. In your wiki you mentioned "Another change shall be made to > KafkaConsumer#poll(), due to its call to updateFetchPositions() which > blocks indefinitely." This part may a bit obscure to most readers who's not > familiar with the KafkaConsumer internals, could you please add more > elaborations. More specifically, I think the root causes of the public APIs > mentioned are a bit different while the KIP's explanation sounds like they > are due to the same reason: > > 3.1 fetchCommittedOffsets(): this internal call will block forever if the > committed offsets cannot be fetched successfully and affect position() and > committed(). We need to break out of its internal while loop. > 3.2 position() itself will while loop when offsets cannot be retrieved in > the underlying async call. We need to break out this while loop. > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should take > the user specified timeouts when applicable. > > > > Guozhang > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu > wrote: > > > Actually, what I said above is inaccurate. In > > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, not > > seek. > > My assumption is that seek did not update correctly. I will be digging > > further into this. > > > > > > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu > > wrote: > > > > > One more thing: when looking through tests, I have realized that seek() > > > methods can potentially block indefinitely. As you well know, seek() is > > > called when pollOnce() or position() is active. Thus, if position() > > blocks > > > indefinitely, then so would seek(). Should bounding seek() also be > > included > > > in this KIP? > > > > > > Thanks, Richard > > > > > > On Sat, Mar 17, 2018 at 1:16 PM, Richard Yu < > yohan.richard...@gmail.com> > > > wrote: > > > > > >> Thanks for the advice, Jason > > >> > > >> I have modified KIP-266 to include the java doc for committed() and > > other > > >> blocking methods, and I also > > >> mentioned poll() which will also be bounded. Let me know if there is > > >> anything else. :) > > >> > > >> Sincerely, Richard > > >> > > >> > > >> > > >> > > >> > > >> On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson > > > >> wrote: > > >> > > >>> Hi Richard, > > >>> > > >>> Thanks for the updates. I'm really glad you picked this up. A couple > > >>> minor > > >>> comments: > > >>> > > >>> 1. Can you list the full set of new APIs explicitly in the KIP? > > >>> Currently I > > >>> only see the javadoc for `position()`. > > >>> > > >>> 2. We should consider adding `TimeUnit` to the new methods to avoid > > unit > > >>> confusion. I know it's inconsistent with the poll() API, but I think > it > > >>> was > > >>> probably a mistake not to include it there, so better not to double > > down > > >>> on > > >>> that mistake. And note that we do already have `close(long, > TimeUnit)`. > > >>> > > >>> Other than that, I think the current KIP seems reasonable. > > >>> > > >>> Thanks, > > >>> Jason > > >>> > > >>> On Wed, Mar 14, 2018 at 5:00 PM, Richard Yu < > > yohan.richard...@gmail.com> > >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi Ismael, You have a great point. Since most of the methods in this KIP have similar callbacks (position() and committed() both use fetchCommittedOffsets(), and commitSync() is similar to position(), except just updating offsets), the amount of time they block should be also about equal. However, I think that we need to take into account a couple of things. For starters, if the new methods were all reliant on one config, there is likelihood that the shortcomings for this approach would be similar to what we faced if we let request.timeout.ms control all method timeouts. In comparison, adding overloads does not have this problem. If you have further thoughts, please let me know. Richard On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma wrote: > Hi, > > An option that is not currently covered in the KIP is to have a separate > config max.block.ms, which is similar to the producer config with the same > name. This came up during the KAFKA-2391 discussion. I think it's clear > that we can't rely on request.timeout.ms, so the decision is between > adding > overloads or adding a new config. People seemed to be leaning towards the > latter in KAFKA-2391, but Jason makes a good point that the overloads are > more flexible. A couple of questions from me: > > 1. Do we need the additional flexibility? > 2. If we do, do we need it for every blocking method? > > Ismael > > On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu > wrote: > > > Hi Guozhang, > > > > I made some clarifications to KIP-266, namely: > > 1. Stated more specifically that commitSync will accept user input. > > 2. fetchCommittedOffsets(): Made its role in blocking more clear to the > > reader. > > 3. Sketched what would happen when time limit is exceeded. > > > > These changes should make the KIP easier to understand. > > > > Cheers, > > Richard > > > > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang > wrote: > > > > > Hi Richard, > > > > > > I made a pass over the KIP again, some more clarifications / comments: > > > > > > 1. seek() call itself is not blocking, only the following poll() call > may > > > be blocking as the actually metadata rq will happen. > > > > > > 2. I saw you did not include Consumer.partitionFor(), > > > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. > > After > > > a second thought, I think this may be a better idea to not tackle them > in > > > the same KIP, and probably we should consider whether we would change > the > > > behavior or not in another discussion. So I agree to not include them. > > > > > > 3. In your wiki you mentioned "Another change shall be made to > > > KafkaConsumer#poll(), due to its call to updateFetchPositions() which > > > blocks indefinitely." This part may a bit obscure to most readers who's > > not > > > familiar with the KafkaConsumer internals, could you please add more > > > elaborations. More specifically, I think the root causes of the public > > APIs > > > mentioned are a bit different while the KIP's explanation sounds like > > they > > > are due to the same reason: > > > > > > 3.1 fetchCommittedOffsets(): this internal call will block forever if > the > > > committed offsets cannot be fetched successfully and affect position() > > and > > > committed(). We need to break out of its internal while loop. > > > 3.2 position() itself will while loop when offsets cannot be retrieved > in > > > the underlying async call. We need to break out this while loop. > > > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should > > take > > > the user specified timeouts when applicable. > > > > > > > > > > > > Guozhang > > > > > > On Sat, Mar 17, 2018 at 4:44 PM, Richard Yu < > yohan.richard...@gmail.com> > > > wrote: > > > > > > > Actually, what I said above is inaccurate. In > > > > testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, > > not > > > > seek. > > > > My assumption is that seek did not update correctly. I will be > digging > > > > further into this. > > > > > > > > > > > > > > > > On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu < > > yohan.richard...@gmail.com> > > > > wrote: > > > > > > > > > One more thing: when looking through tests, I have realized that > > seek() > > > > > methods can potentially bl
Re: [VOTE] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade
Hi Matthias, Thanks for setting up the upgrade path. +1 (non-binding) On Tue, Mar 20, 2018 at 3:42 PM, Matthias J. Sax wrote: > Hi, > > I would like to start the vote for KIP-268: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > > PR https://github.com/apache/kafka/pull/4636 contains the fixes to > upgrade from metadata version 1 to 2. Some tests are still missing but > I'll add them asap. > > For "version probing" including new metadata version 3 I plan to do a > follow-up PR after PR-4636 is merged. > > > -Matthias > >
Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade
Hi Matthias, Just wondering, once this KIP goes through. Could I restart my older KIP to update SubscriptionInfo? Thanks Richard On Wed, Mar 21, 2018 at 11:18 AM, Matthias J. Sax wrote: > Thanks for following up James. > > > Is this the procedure that happens during every rebalance? The reason I > ask is that this step: > As long as the leader (before or after upgrade) receives at least > one old version X Subscription it always sends version Assignment X back > (the encoded supported version is X before the leader is upgrade and Y > after the leader is upgraded). > > Yes, that would be the consequence. > > > This implies that the leader receives all Subscriptions before sending > back any responses. Is that what actually happens? Is it possible that it > would receive say 4 out of 5 Subscriptions of Y, send back a response Y, > and then later receive a Subscription X? What happens in that case? Would > that Subscription X then trigger another rebalance, and the whole thing > starts again? > > That sounds correct. A 'delayed' Subscription could always happen -- > even before KIP-268 -- and would trigger a new rebalance. With this > regard, the behavior does not change. The difference is, that we would > automatically downgrade the Assignment from Y to X again -- but the > application would not fail (as it would before the KIP). > > Do you see an issue with this behavior. The idea of the design is to > make Kafka Streams robust against those scenarios. Thus, if 4 apps are > upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first > upgrade from X to Y and downgrade from Y to X in the second rebalance > when no.5 joins the group. If no.5 gets upgraded, a third rebalance > would upgrade to Y again. > > Thus, as long as not all instances are on the newest version, > upgrades/donwgrades of the exchanged rebalance metadata could happen > multiple times. However, this should not be an issue from my understanding. > > > Let us know what you think about it. > > > -Matthias > > > On 3/20/18 11:10 PM, James Cheng wrote: > > Sorry, I see that the VOTE started already, but I have a late question > on this KIP. > > > > In the "version probing" protocol: > >> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2): > >> On startup/rolling-bounce, an instance does not know what version the > leader understands and (optimistically) sends an Subscription with the > latest version Y > >> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the > corresponding instance that sent the newer Subscription it does not > understand. The Assignment metadata only encodes both version numbers > (used-version == supported-version) as leader's supported-version X. > >> For all other instances the leader sends a regular Assignment in > version X back. > >> If an upgrade follower sends new version number Y Subscription and > receives version X Assignment with "supported-version = X", it can > downgrade to X (in-memory flag) and resends a new Subscription with old > version X to retry joining the group. To force an immediate second > rebalance, the follower does an "unsubscribe()/subscribe()/poll()" > sequence. > >> As long as the leader (before or after upgrade) receives at least one > old version X Subscription it always sends version Assignment X back (the > encoded supported version is X before the leader is upgrade and Y after the > leader is upgraded). > >> If an upgraded instance receives an Assigment it always checks the > leaders supported-version and update its downgraded "used-version" if > possible > > > > Is this the procedure that happens during every rebalance? The reason I > ask is that this step: > >>> As long as the leader (before or after upgrade) receives at least one > old version X Subscription it always sends version Assignment X back (the > encoded supported version is X before the leader is upgrade and Y after the > leader is upgraded). > > > > This implies that the leader receives all Subscriptions before sending > back any responses. Is that what actually happens? Is it possible that it > would receive say 4 out of 5 Subscriptions of Y, send back a response Y, > and then later receive a Subscription X? What happens in that case? Would > that Subscription X then trigger another rebalance, and the whole thing > starts again? > > > > Thanks, > > -James > > > >> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax > wrote: > >> > >> Guozhang, > >> > >> thanks for your comments. > >> > >> 2: I think my main concern is, that 1.2 would be "special" release that > >> everybody need to use to upgrade. As an alternative, we could say that > >> we add the config in 1.2 and keep it for 2 additional releases (1.3 and > >> 1.4) but remove it in 1.5. This gives users more flexibility and does > >> force not force user to upgrade to a specific version but also allows us > >> to not carry the tech debt forever. WDYT about this? If users upgrade on > >> an regular basis, this approac
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
I do have one question though: in the current KIP, throwing TimeoutException to mark that time limit is exceeded is applied to all new methods introduced in this proposal. However, how would users respond when a TimeoutException (since it is considered a RuntimeException)? Thanks, Richard On Mon, Mar 19, 2018 at 6:10 PM, Richard Yu wrote: > Hi Ismael, > > You have a great point. Since most of the methods in this KIP have similar > callbacks (position() and committed() both use fetchCommittedOffsets(), > and > commitSync() is similar to position(), except just updating offsets), the > amount of time > they block should be also about equal. > > However, I think that we need to take into account a couple of things. For > starters, > if the new methods were all reliant on one config, there is likelihood > that the > shortcomings for this approach would be similar to what we faced if we let > request.timeout.ms control all method timeouts. In comparison, adding > overloads > does not have this problem. > > If you have further thoughts, please let me know. > > Richard > > > On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma wrote: > >> Hi, >> >> An option that is not currently covered in the KIP is to have a separate >> config max.block.ms, which is similar to the producer config with the >> same >> name. This came up during the KAFKA-2391 discussion. I think it's clear >> that we can't rely on request.timeout.ms, so the decision is between >> adding >> overloads or adding a new config. People seemed to be leaning towards the >> latter in KAFKA-2391, but Jason makes a good point that the overloads are >> more flexible. A couple of questions from me: >> >> 1. Do we need the additional flexibility? >> 2. If we do, do we need it for every blocking method? >> >> Ismael >> >> On Mon, Mar 19, 2018 at 5:03 PM, Richard Yu >> wrote: >> >> > Hi Guozhang, >> > >> > I made some clarifications to KIP-266, namely: >> > 1. Stated more specifically that commitSync will accept user input. >> > 2. fetchCommittedOffsets(): Made its role in blocking more clear to the >> > reader. >> > 3. Sketched what would happen when time limit is exceeded. >> > >> > These changes should make the KIP easier to understand. >> > >> > Cheers, >> > Richard >> > >> > On Mon, Mar 19, 2018 at 9:33 AM, Guozhang Wang >> wrote: >> > >> > > Hi Richard, >> > > >> > > I made a pass over the KIP again, some more clarifications / comments: >> > > >> > > 1. seek() call itself is not blocking, only the following poll() call >> may >> > > be blocking as the actually metadata rq will happen. >> > > >> > > 2. I saw you did not include Consumer.partitionFor(), >> > > Consumer.OffsetAndTimestamp() and Consumer.listTopics() in your KIP. >> > After >> > > a second thought, I think this may be a better idea to not tackle >> them in >> > > the same KIP, and probably we should consider whether we would change >> the >> > > behavior or not in another discussion. So I agree to not include them. >> > > >> > > 3. In your wiki you mentioned "Another change shall be made to >> > > KafkaConsumer#poll(), due to its call to updateFetchPositions() which >> > > blocks indefinitely." This part may a bit obscure to most readers >> who's >> > not >> > > familiar with the KafkaConsumer internals, could you please add more >> > > elaborations. More specifically, I think the root causes of the public >> > APIs >> > > mentioned are a bit different while the KIP's explanation sounds like >> > they >> > > are due to the same reason: >> > > >> > > 3.1 fetchCommittedOffsets(): this internal call will block forever if >> the >> > > committed offsets cannot be fetched successfully and affect position() >> > and >> > > committed(). We need to break out of its internal while loop. >> > > 3.2 position() itself will while loop when offsets cannot be >> retrieved in >> > > the underlying async call. We need to break out this while loop. >> > > 3.3 commitSync() passed Long.MAX_VALUE as the timeout value, we should >> > take >> > > the user specified timeouts when applicable. >> > > >> > > >> > > >> > > Guozhang >> > > >> > > On Sat, Mar 17, 2
Re: [DISCUSS] KIP-262 Metadata should include the number of state stores for task
Hi Matthias, Sorry for taking so long to get back to you. I will change my KIP to also include the old format. Thanks, Richard On Thu, Mar 22, 2018 at 9:15 PM, Matthias J. Sax wrote: > Hi Richard, > > with KIP-268 in place (should be accepted soon) the upgrade path is > covered. Thus, you can update your KIP accordingly, referring to KIP-268. > > Can you also update your KIP similar to KIP-268 to cover the old and new > metadata format? > > Thanks! > > -Matthias > > > On 2/24/18 4:07 PM, Richard Yu wrote: > > I didn't really get what "upgrade strategy" was at the time that Guozhang > > mentioned it, so I wrote the above dialogue from my first understanding. > I > > changed it to "upgrade strategy must be provided". Currently, however, I > do > > not have anything in mind to facilitate upgrading older Kafka brokers. If > > you have anything in mind, please let me know. > > > > > > > > > > > > > > On Sat, Feb 24, 2018 at 3:58 PM, Matthias J. Sax > > wrote: > > > >> Thanks a lot for this KIP. > >> > >> I am not sure what you mean by > >> > >>> which could potentially break older versions of Kafka brokers > >> > >> The metadata that is exchange, is not interpreted by the brokers. The > >> problem with upgrading the metadata format affect only Kafka Streams > >> instances. > >> > >> If we don't provide an upgrade strategy, changing the metadata format > >> required to stop all running application instances, before the instances > >> can be restarted with the new code. However, this implies downtime for > >> an application and is thus not acceptable. > >> > >> > >> -Matthias > >> > >> > >> On 2/24/18 11:11 AM, Richard Yu wrote: > >>> Hi all, > >>> > >>> I would like to discuss a KIP I've submitted : > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> 262%3A+Metadata+should+include+number+of+state+stores+for+task > >>> > >>> > >>> Regards, > >>> Richard Yu > >>> > >> > >> > > > >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
ainst having timeouts in the methods at the time. However, as Jason > > said > > > offline, we did end up with a timeout parameter in `poll`. > > > > > > Ismael > > > > > > On Fri, Mar 23, 2018 at 4:26 PM, Ewen Cheslack-Postava < > > e...@confluent.io> > > > wrote: > > > > > > > Regarding the flexibility question, has someone tried to dig up the > > > > discussion of the new consumer APIs when they were being written? I > > > vaguely > > > > recall these exact questions about using APIs vs configs and > > flexibility > > > vs > > > > bloating the API surface area having already been discussed. (Not > that > > we > > > > shouldn't revisit, just that it might also be a faster way to get to > a > > > full > > > > understanding of the options, concerns, and tradeoffs). > > > > > > > > -Ewen > > > > > > > > On Thu, Mar 22, 2018 at 7:19 AM, Richard Yu < > > yohan.richard...@gmail.com> > > > > wrote: > > > > > > > > > I do have one question though: in the current KIP, throwing > > > > > TimeoutException to mark > > > > > that time limit is exceeded is applied to all new methods > introduced > > in > > > > > this proposal. > > > > > However, how would users respond when a TimeoutException (since it > is > > > > > considered > > > > > a RuntimeException)? > > > > > > > > > > Thanks, > > > > > Richard > > > > > > > > > > > > > > > > > > > > On Mon, Mar 19, 2018 at 6:10 PM, Richard Yu < > > > yohan.richard...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Ismael, > > > > > > > > > > > > You have a great point. Since most of the methods in this KIP > have > > > > > similar > > > > > > callbacks (position() and committed() both use > > > fetchCommittedOffsets(), > > > > > > and > > > > > > commitSync() is similar to position(), except just updating > > offsets), > > > > the > > > > > > amount of time > > > > > > they block should be also about equal. > > > > > > > > > > > > However, I think that we need to take into account a couple of > > > things. > > > > > For > > > > > > starters, > > > > > > if the new methods were all reliant on one config, there is > > > likelihood > > > > > > that the > > > > > > shortcomings for this approach would be similar to what we faced > if > > > we > > > > > let > > > > > > request.timeout.ms control all method timeouts. In comparison, > > > adding > > > > > > overloads > > > > > > does not have this problem. > > > > > > > > > > > > If you have further thoughts, please let me know. > > > > > > > > > > > > Richard > > > > > > > > > > > > > > > > > > On Mon, Mar 19, 2018 at 5:12 PM, Ismael Juma > > > > wrote: > > > > > > > > > > > >> Hi, > > > > > >> > > > > > >> An option that is not currently covered in the KIP is to have a > > > > separate > > > > > >> config max.block.ms, which is similar to the producer config > with > > > the > > > > > >> same > > > > > >> name. This came up during the KAFKA-2391 discussion. I think > it's > > > > clear > > > > > >> that we can't rely on request.timeout.ms, so the decision is > > > between > > > > > >> adding > > > > > >> overloads or adding a new config. People seemed to be leaning > > > towards > > > > > the > > > > > >> latter in KAFKA-2391, but Jason makes a good point that the > > > overloads > > > > > are > > > > > >> more flexible. A couple of questions from me: > > > > > >> > > > > > >> 1. Do we need the additional flexibility? > > > > > >> 2. If we do, do we need it for every blocking method? > > > > > >> > > >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
On a side note, I have noticed that the several other methods in classes such as StoreChangeLogReader in Streams calls position() which causes tests to hang. It might be out of the scope of the KIP, but should I also change the methods which use position() as a callback to at the very least prevent the tests from hanging? This issue might be out of the KIP, but I prefer it if we could at least make my PR pass the Jenkins Q&A. Thanks On Fri, Mar 30, 2018 at 8:24 PM, Richard Yu wrote: > Thanks for the review Becket. > > About the methods beginningOffsets(), endOffsets(), ...: > I took a look through the code of KafkaConsumer, but after looking through > the offsetsByTimes() method > and its callbacks in Fetcher, I think these methods already block for a > set period of time. I know that there > is a chance that the offsets methods in KafkaConsumer might be like poll > (that is one section of the method > honors the timeout while another -- updateFetchPositions -- does not). > However, I don't think that this is the > case with offsetsByTimes since the callbacks that I checked does not seem > to hang. > > The clarity of the exception message is a problem. I thought your > suggestion there was reasonable. I included > it in the KIP. > > And on another note, I have noticed that several people has voiced the > opinion that adding a config might > be advisable in relation to adding an extra parameter. I think that we can > have a compromise of sorts: some > methods in KafkaConsumer are relatively similar -- for example, position() > and committed() both call > updateFetchPositions(). I think that we could use the same config for > these method as a default timeout if > the user does not provide one. On the other hand, if they wish to specify > a longer or shorter blocking time, > they have the option of changing the timeout. (I included the config as an > alternative in the KIP) WDYT? > > Thanks, > Richard > > > On Fri, Mar 30, 2018 at 1:26 AM, Becket Qin wrote: > >> Glad to see the KIP, Richard. This has been a really long pending issue. >> >> The original arguments from Jay for using config, such as max.block.ms, >> instead of using timeout parameters was that people will always hard code >> the timeout, and the hard coded timeout is rarely correct because it has >> to >> consider different scenarios. For example, users may receive timeout >> exception when the group coordinator moves. Having a configuration with >> some reasonable default value will make users' life easier. >> >> That said, in practice, it seems more useful to have timeout parameters. >> We >> have seen some library, using the consumers internally, needs to provide >> an >> external flexible timeout interface. Also, user can easily hard code a >> value to get the same as a config based solution. >> >> The KIP looks good overall. A few comments: >> >> 1. There are a few other blocking methods that are not included, e.g. >> offsetsForTimes(), beginningOffsets(), endOffsets(). Is there any reason? >> >> 2. I am wondering can we take the KIP as a chance to clean up our timeout >> exception(s)? More specifically, instead of reusing TimeoutException, can >> we introduce a new ClientTimeoutException with different causes, e.g. >> UnknownTopicOrPartition, RequestTimeout, LeaderNotAvailable, etc. >> As of now, the TimeoutException is used in the following three cases: >> >>1. TimeoutException is a subclass of ApiException which indicates the >>exception was returned by the broker. The TimeoutException was >> initially >>returned by the leaders when replication was not done within the >> specified >>timeout in the ProduceRequest. It has an error code of 7, which is >> returned >>by the broker. >>2. When we migrate to Java clients, in Errors definition, we extended >> it >>to indicate request timeout, i.e. a request was sent but the response >> was >>not received before timeout. In this case, the clients did not have a >>return code from the broker. >>3. Later at some point, we started to use the TimeoutException for >>clients method call timeout. It is neither related to any broker >> returned >>error code, nor to request timeout on the wire. >> >> Due to the various interpretations, users can easily be confused. As an >> example, when a timeout is thrown with "Failed to refresh metadata in X >> ms", it is hard to tell what exactly happened. Since we are changing the >> API here, it would be good to avoid introducing more ambiguity and see >> whether this c
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi Guozhang, I have clarified the KIP a bit to account for Becket's suggestion on ClientTimeoutException. About adding an extra config, you were right about my intentions. I am just wondering if the config should be included, since Ismael seems to favor an extra configuration, Thanks, Richard On Sun, Apr 1, 2018 at 5:35 PM, Guozhang Wang wrote: > Hi Richard, > > Regarding the streams side changes, we plan to incorporate with the new > APIs once the KIP is done, which is only internal code changes and hence do > not need to include in the KIP. > > Could you update the KIP because it has been quite obsoleted from the > discussed topics, and I'm a bit loosing track on what is your final > proposal right now. For example, I'm not completely following your > "compromise > of sorts": are you suggesting that we still add overloading functions and > add a config that will be applied to all overload functions without the > timeout, while for other overloaded functions with the timeout value the > config will be ignored? > > > Guozhang > > On Fri, Mar 30, 2018 at 8:36 PM, Richard Yu > wrote: > > > On a side note, I have noticed that the several other methods in classes > > such as StoreChangeLogReader in Streams calls position() which causes > tests > > to hang. It might be out of the scope of the KIP, but should I also > change > > the methods which use position() as a callback to at the very least > prevent > > the tests from hanging? This issue might be out of the KIP, but I prefer > it > > if we could at least make my PR pass the Jenkins Q&A. > > > > Thanks > > > > On Fri, Mar 30, 2018 at 8:24 PM, Richard Yu > > wrote: > > > > > Thanks for the review Becket. > > > > > > About the methods beginningOffsets(), endOffsets(), ...: > > > I took a look through the code of KafkaConsumer, but after looking > > through > > > the offsetsByTimes() method > > > and its callbacks in Fetcher, I think these methods already block for a > > > set period of time. I know that there > > > is a chance that the offsets methods in KafkaConsumer might be like > poll > > > (that is one section of the method > > > honors the timeout while another -- updateFetchPositions -- does not). > > > However, I don't think that this is the > > > case with offsetsByTimes since the callbacks that I checked does not > seem > > > to hang. > > > > > > The clarity of the exception message is a problem. I thought your > > > suggestion there was reasonable. I included > > > it in the KIP. > > > > > > And on another note, I have noticed that several people has voiced the > > > opinion that adding a config might > > > be advisable in relation to adding an extra parameter. I think that we > > can > > > have a compromise of sorts: some > > > methods in KafkaConsumer are relatively similar -- for example, > > position() > > > and committed() both call > > > updateFetchPositions(). I think that we could use the same config for > > > these method as a default timeout if > > > the user does not provide one. On the other hand, if they wish to > specify > > > a longer or shorter blocking time, > > > they have the option of changing the timeout. (I included the config as > > an > > > alternative in the KIP) WDYT? > > > > > > Thanks, > > > Richard > > > > > > > > > On Fri, Mar 30, 2018 at 1:26 AM, Becket Qin > > wrote: > > > > > >> Glad to see the KIP, Richard. This has been a really long pending > issue. > > >> > > >> The original arguments from Jay for using config, such as > max.block.ms, > > >> instead of using timeout parameters was that people will always hard > > code > > >> the timeout, and the hard coded timeout is rarely correct because it > has > > >> to > > >> consider different scenarios. For example, users may receive timeout > > >> exception when the group coordinator moves. Having a configuration > with > > >> some reasonable default value will make users' life easier. > > >> > > >> That said, in practice, it seems more useful to have timeout > parameters. > > >> We > > >> have seen some library, using the consumers internally, needs to > provide > > >> an > > >> external flexible timeout interface. Also, user can easily hard code a > > >> value to get the same as a config based so
Re: [VOTE] KIP-274: Kafka Streams Skipped Records Metrics
+1 On Mon, Apr 2, 2018 at 8:42 AM, Guozhang Wang wrote: > +1 (binding). > > On Mon, Apr 2, 2018 at 7:22 AM, Ted Yu wrote: > > > +1 > > > > On Mon, Apr 2, 2018 at 7:11 AM, Bill Bejeck wrote: > > > > > Thanks for the KIP. > > > > > > +1 > > > > > > -Bill > > > > > > On Mon, Apr 2, 2018 at 10:09 AM, John Roesler > wrote: > > > > > > > Dear Kafka community, > > > > > > > > I am proposing KIP-274 to improve visibility when Streams skips > invalid > > > > records. > > > > > > > > The proposal is to simplify the metrics and add warning logs. > > > > > > > > Please find details in the wiki: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 274%3A+Kafka+Streams+Skipped+Records+Metrics > > > > > > > > Thanks, > > > > > > > > -John > > > > > > > > > > > > > -- > -- Guozhang >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi all, If possible, would a committer please review? Thanks On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu wrote: > Hi Guozhang, > > I have clarified the KIP a bit to account for Becket's suggestion on > ClientTimeoutException. > About adding an extra config, you were right about my intentions. I am > just wondering if the config > should be included, since Ismael seems to favor an extra configuration, > > Thanks, > Richard > > On Sun, Apr 1, 2018 at 5:35 PM, Guozhang Wang wrote: > >> Hi Richard, >> >> Regarding the streams side changes, we plan to incorporate with the new >> APIs once the KIP is done, which is only internal code changes and hence >> do >> not need to include in the KIP. >> >> Could you update the KIP because it has been quite obsoleted from the >> discussed topics, and I'm a bit loosing track on what is your final >> proposal right now. For example, I'm not completely following your >> "compromise >> of sorts": are you suggesting that we still add overloading functions and >> add a config that will be applied to all overload functions without the >> timeout, while for other overloaded functions with the timeout value the >> config will be ignored? >> >> >> Guozhang >> >> On Fri, Mar 30, 2018 at 8:36 PM, Richard Yu >> wrote: >> >> > On a side note, I have noticed that the several other methods in classes >> > such as StoreChangeLogReader in Streams calls position() which causes >> tests >> > to hang. It might be out of the scope of the KIP, but should I also >> change >> > the methods which use position() as a callback to at the very least >> prevent >> > the tests from hanging? This issue might be out of the KIP, but I >> prefer it >> > if we could at least make my PR pass the Jenkins Q&A. >> > >> > Thanks >> > >> > On Fri, Mar 30, 2018 at 8:24 PM, Richard Yu > > >> > wrote: >> > >> > > Thanks for the review Becket. >> > > >> > > About the methods beginningOffsets(), endOffsets(), ...: >> > > I took a look through the code of KafkaConsumer, but after looking >> > through >> > > the offsetsByTimes() method >> > > and its callbacks in Fetcher, I think these methods already block for >> a >> > > set period of time. I know that there >> > > is a chance that the offsets methods in KafkaConsumer might be like >> poll >> > > (that is one section of the method >> > > honors the timeout while another -- updateFetchPositions -- does not). >> > > However, I don't think that this is the >> > > case with offsetsByTimes since the callbacks that I checked does not >> seem >> > > to hang. >> > > >> > > The clarity of the exception message is a problem. I thought your >> > > suggestion there was reasonable. I included >> > > it in the KIP. >> > > >> > > And on another note, I have noticed that several people has voiced the >> > > opinion that adding a config might >> > > be advisable in relation to adding an extra parameter. I think that we >> > can >> > > have a compromise of sorts: some >> > > methods in KafkaConsumer are relatively similar -- for example, >> > position() >> > > and committed() both call >> > > updateFetchPositions(). I think that we could use the same config for >> > > these method as a default timeout if >> > > the user does not provide one. On the other hand, if they wish to >> specify >> > > a longer or shorter blocking time, >> > > they have the option of changing the timeout. (I included the config >> as >> > an >> > > alternative in the KIP) WDYT? >> > > >> > > Thanks, >> > > Richard >> > > >> > > >> > > On Fri, Mar 30, 2018 at 1:26 AM, Becket Qin >> > wrote: >> > > >> > >> Glad to see the KIP, Richard. This has been a really long pending >> issue. >> > >> >> > >> The original arguments from Jay for using config, such as >> max.block.ms, >> > >> instead of using timeout parameters was that people will always hard >> > code >> > >> the timeout, and the hard coded timeout is rarely correct because it >> has >> > >> to >> > >> consider different scenarios. For example, users may re
Re: [DISCUSS] KIP-288: Consumer poll timeout change and new waitForAssignment method
Hi John, bq. #1 (wait for metadata) is infinite. Some of what you stated in this KIP has already been previously discussed in a older KIP. (KIP-266) Just for your reference. Thanks, Richard On Tue, Apr 17, 2018 at 11:08 AM, John Roesler wrote: > Hello all, > > I am proposing KIP-288 to improve the semantics of Consumer.poll() w.r.t. > the timeout parameter and to add a new method for blocking for assignment > only. > > Please find the details here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 288%3A+Consumer.poll%28%29+timeout+semantic+change+and+ > new+waitForAssignment+method > > Please let me know what you think! > > Thanks, > -John >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi John, I think that you could finish your PR that corresponds with KIP-288 and merge it. I can finish my side of the work afterwards. On another note, adding an asynchronized version of poll() would make sense, particularily since the current version of Kafka does not support it. Thanks Richar On Tue, Apr 17, 2018 at 12:30 PM, John Roesler wrote: > Cross-pollinating from some discussion we've had on KIP-288, > > I think there's a good reason that poll() takes a timeout when none of the > other methods do, and it's relevant to this discussion. The timeout in > poll() is effectively implementing a long-poll API (on the client side, so > it's not really long-poll, but the programmer-facing behavior is the same). > The timeout isn't really bounding the execution time of the method, but > instead giving a max time that callers are willing to wait around and see > if any results show up. > > If I understand the code sufficiently, it would be perfectly reasonable for > a caller to use a timeout of 0 to implement async poll, it would just mean > that KafkaConsumer would just check on each call if there's a response > ready and if not, fire off a new request without waiting for a response. > > As such, it seems inappropriate to throw a ClientTimeoutException from > poll(), except possibly if the initial phase of ensuring an assignment > times out. We wouldn't want the method contract to be "returns a non-empty > collection or throws a ClientTimeoutException" > > Now, I'm wondering if we should actually consider one of my rejected > alternatives, to treat the "operation timeout" as a separate parameter from > the "long-poll time". Or maybe adding an "asyncPoll(timeout, time unit)" > that only uses the timeout to bound metadata updates and otherwise behaves > like the current "poll(0)". > > Thanks, > -John > > On Tue, Apr 17, 2018 at 2:05 PM, John Roesler wrote: > > > Hey Richard, > > > > As you noticed, the newly introduced KIP-288 overlaps with this one. > Sorry > > for stepping on your toes... How would you like to proceed? I'm happy to > > "close" KIP-288 in deference to this KIP. > > > > With respect to poll(), reading this discussion gave me a new idea for > > providing a non-breaking update path... What if we introduce a new > variant > > 'poll(long timeout, TimeUnit unit)' that displays the new, desired > > behavior, and just leave the old method alone? > > > > Thanks, > > -John > > > > On Tue, Apr 17, 2018 at 12:09 PM, Richard Yu > > > wrote: > > > >> Hi all, > >> > >> If possible, would a committer please review? > >> > >> Thanks > >> > >> On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu > >> wrote: > >> > >> > Hi Guozhang, > >> > > >> > I have clarified the KIP a bit to account for Becket's suggestion on > >> > ClientTimeoutException. > >> > About adding an extra config, you were right about my intentions. I am > >> > just wondering if the config > >> > should be included, since Ismael seems to favor an extra > configuration, > >> > > >> > Thanks, > >> > Richard > >> > > >> > On Sun, Apr 1, 2018 at 5:35 PM, Guozhang Wang > >> wrote: > >> > > >> >> Hi Richard, > >> >> > >> >> Regarding the streams side changes, we plan to incorporate with the > new > >> >> APIs once the KIP is done, which is only internal code changes and > >> hence > >> >> do > >> >> not need to include in the KIP. > >> >> > >> >> Could you update the KIP because it has been quite obsoleted from the > >> >> discussed topics, and I'm a bit loosing track on what is your final > >> >> proposal right now. For example, I'm not completely following your > >> >> "compromise > >> >> of sorts": are you suggesting that we still add overloading functions > >> and > >> >> add a config that will be applied to all overload functions without > the > >> >> timeout, while for other overloaded functions with the timeout value > >> the > >> >> config will be ignored? > >> >> > >> >> > >> >> Guozhang > >> >> > >> >> On Fri, Mar 30, 2018 at 8:36 PM, Richard Yu < > >> yohan.richard...@gmail.com> > >> >> wrote: >
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
Hi John, Do you have a preference for fixing the poll() method (e.g. using asyncPoll or just sticking with the current method but with an extra timeout parameter) ? I think your current proposition for KIP-288 is better than what I have on my side. If you think there is something that you want to add, you could go ahead and change KIP-266 to your liking. Just to note that it would be preferable that if one of us modifies this KIP, it would be best to mention your change on this thread to let each other know (makes it easier to coordinate progress). Thanks, Richard On Tue, Apr 17, 2018 at 2:07 PM, John Roesler wrote: > Ok, I'll close the discussion on KIP-288 and mark it discarded. > > We can solidify the design for poll in KIP-266, and once it's approved, > I'll coordinate with Qiang Zhao on the PR for the poll part of the work. > Once that is merged, you'll have a clean slate for the rest of the work. > > On Tue, Apr 17, 2018 at 3:39 PM, Richard Yu > wrote: > > > Hi John, > > > > I think that you could finish your PR that corresponds with KIP-288 and > > merge it. I can finish my side of the work afterwards. > > > > On another note, adding an asynchronized version of poll() would make > > sense, particularily since the current version of Kafka does not support > > it. > > > > Thanks > > Richar > > > > On Tue, Apr 17, 2018 at 12:30 PM, John Roesler > wrote: > > > > > Cross-pollinating from some discussion we've had on KIP-288, > > > > > > I think there's a good reason that poll() takes a timeout when none of > > the > > > other methods do, and it's relevant to this discussion. The timeout in > > > poll() is effectively implementing a long-poll API (on the client side, > > so > > > it's not really long-poll, but the programmer-facing behavior is the > > same). > > > The timeout isn't really bounding the execution time of the method, but > > > instead giving a max time that callers are willing to wait around and > see > > > if any results show up. > > > > > > If I understand the code sufficiently, it would be perfectly reasonable > > for > > > a caller to use a timeout of 0 to implement async poll, it would just > > mean > > > that KafkaConsumer would just check on each call if there's a response > > > ready and if not, fire off a new request without waiting for a > response. > > > > > > As such, it seems inappropriate to throw a ClientTimeoutException from > > > poll(), except possibly if the initial phase of ensuring an assignment > > > times out. We wouldn't want the method contract to be "returns a > > non-empty > > > collection or throws a ClientTimeoutException" > > > > > > Now, I'm wondering if we should actually consider one of my rejected > > > alternatives, to treat the "operation timeout" as a separate parameter > > from > > > the "long-poll time". Or maybe adding an "asyncPoll(timeout, time > unit)" > > > that only uses the timeout to bound metadata updates and otherwise > > behaves > > > like the current "poll(0)". > > > > > > Thanks, > > > -John > > > > > > On Tue, Apr 17, 2018 at 2:05 PM, John Roesler > wrote: > > > > > > > Hey Richard, > > > > > > > > As you noticed, the newly introduced KIP-288 overlaps with this one. > > > Sorry > > > > for stepping on your toes... How would you like to proceed? I'm happy > > to > > > > "close" KIP-288 in deference to this KIP. > > > > > > > > With respect to poll(), reading this discussion gave me a new idea > for > > > > providing a non-breaking update path... What if we introduce a new > > > variant > > > > 'poll(long timeout, TimeUnit unit)' that displays the new, desired > > > > behavior, and just leave the old method alone? > > > > > > > > Thanks, > > > > -John > > > > > > > > On Tue, Apr 17, 2018 at 12:09 PM, Richard Yu < > > yohan.richard...@gmail.com > > > > > > > > wrote: > > > > > > > >> Hi all, > > > >> > > > >> If possible, would a committer please review? > > > >> > > > >> Thanks > > > >> > > > >> On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu < > > yohan.richard...@gmail.com> > > > >>
Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()
is discussion we need to first figure that part out. I'm in favor of > > the > > > current approach of overloaded functions over the config since if we > are > > > going to have multiple configs other than a single one to control > timeout > > > semantics it may be even confusing: take our producer side configs for > an > > > example, right now we have "request.timeout.ms" and "max.block.ms" and > > we > > > are proposing to add another one in KIP-91. But I'd also like to hear > > from > > > people who's in favor of the configs. > > > > > > > > > Guozhang > > > > > > > > > On Wed, Apr 25, 2018 at 1:39 PM, John Roesler > wrote: > > > > > > > Re Ted's last comment, that style of async API requires some thread > to > > > > actually drive the request/response cycle and invoke the callback > when > > > it's > > > > complete. Right now, this happens in the caller's thread as a > > side-effect > > > > of calling poll(). But that clearly won't work for poll() itself! > > > > > > > > In the future, I think we'd like to add a background thread to drive > > the > > > > request/response loops, and then make all these methods return > > > > Future. > > > > > > > > But we don't need to bite that off right now. > > > > > > > > The "async" model I'm proposing is really just a generalization of > the > > > one > > > > that poll already partially implements: when you call poll, it fires > > off > > > > any requests it needs to make and checks if any responses are ready. > If > > > so, > > > > it returns them. If not, it returns empty. When you call poll() > again, > > it > > > > again checks on the responses from last time, and so forth. > > > > > > > > But that model currently only applies to the "fetch" part of poll. > I'm > > > > proposing that we extend it to the "metadata update" part of poll as > > > well. > > > > > > > > However, as previously discussed, doing this in place would break the > > > > semantics of poll that folks currently rely on, so I propose to add > new > > > > methods and deprecate the existing poll method. Here's what I'm > > thinking: > > > > https://github.com/apache/kafka/pull/4855 . In the discussion on > that > > > PR, > > > > I've described in greater detail how the async+blocking semantics > work. > > > > > > > > I'll update KIP-266 with this interface for poll(). > > > > > > > > It would be great to get this discussion moving again so we can get > > these > > > > changes into 2.0. What does everyone think about this? > > > > > > > > Thanks, > > > > -John > > > > > > > > On Thu, Apr 19, 2018 at 5:12 PM, John Roesler > > wrote: > > > > > > > > > Thanks for the tip, Ted! > > > > > > > > > > On Thu, Apr 19, 2018 at 12:12 PM, Ted Yu > > wrote: > > > > > > > > > >> John: > > > > >> In case you want to pursue async poll, it seems (by looking at > > current > > > > >> API) > > > > >> that introducing PollCallback follows existing pattern(s). > > > > >> > > > > >> e.g. KafkaConsumer#commitAsync(OffsetCommitCallback) > > > > >> > > > > >> FYI > > > > >> > > > > >> On Thu, Apr 19, 2018 at 10:08 AM, John Roesler > > > > > wrote: > > > > >> > > > > >> > Hi Richard, > > > > >> > > > > > >> > Thanks for the invitation! I do think it would be safer to > > > introduce a > > > > >> new > > > > >> > poll > > > > >> > method than to change the semantics of the old one. I've been > > > mulling > > > > >> about > > > > >> > whether the new one could still have (slightly different) async > > > > >> semantics > > > > >> > with > > > > >> > a timeout of 0. If possible, I'd like to avoid introducing > another > > > new > > > > >> > "asyncPoll". >
[DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
Hello, I would like to solicit review and comment on this issue (link below): https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+getAllKeys%28%29+API+to+ReadOnlyWindowStore
Re: [VOTE] KIP-202
The vote has passed with 5++. We are now closing the vote. On Mon, Sep 25, 2017 at 1:18 AM, Guozhang Wang wrote: > If no on else has opinions or votes on this thread, Richard could you close > the voting phase then? > > On Sat, Sep 23, 2017 at 4:11 PM, Ismael Juma wrote: > > > Thanks for the KIP, +1 (binding). > > > > On 19 Sep 2017 12:27 am, "Richard Yu" > wrote: > > > > > Hello, I would like to start a VOTE thread on KIP-202. > > > > > > Thanks. > > > > > > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
We should split KAFKA-4499 into several sub-issues with 4499 being the parent issue. Adding the implementation to CachingWindowStore, RocksDBWindowStore, etc will each require the addition of a test and implementing the methods which is not trivial. This way, it should be easier to manage the progress of the KIP. On Thu, Oct 5, 2017 at 2:58 PM, Matthias J. Sax wrote: > Thanks for driving this and sorry for late response. With release > deadline it was pretty busy lately. > > Can you please add a description for the suggested method, what they are > going to return? It's a little unclear to me atm. > > It would also be helpful to discuss, for which use case each method is > useful. This might also help to identify potential gaps for which > another API might be more helpful. > > Also, we should talk about provided guarantees when using those APIs > with regard to consistency -- not saying that we need to provide strong > guarantees, but he KIP should describe what user can expect. > > > -Matthias > > On 9/24/17 8:11 PM, Richard Yu wrote: > > Hello, I would like to solicit review and comment on this issue (link > > below): > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 205%3A+Add+getAllKeys%28%29+API+to+ReadOnlyWindowStore > > > >
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
Thanks for the clarifications, Xavier. I have removed most of the methods except for keys() and all() which has been renamed to Guozhang Wang's suggestions. Hope this helps. On Fri, Oct 13, 2017 at 3:28 PM, Xavier Léauté wrote: > Thanks for the KIP Richard, this is a very useful addition! > > As far as the API changes, I just have a few comments on the methods that > don't seem directly related to the KIP title, and naming of course :). > On the implementation, see my notes further down that will hopefully > clarify a few things. > > Regarding the "bonus" methods: > I agree with Guozhang that the KIP lacks proper motivation for adding the > min, max, and allLatest methods. > It is also not clear to me what min and max would really mean, what > ordering do we refer to here? Are we first ordering by time, then key, or > first by key, then time? > The allLatest method might be useful, but I don't really see how it would > be used in practice if we have to scan the entire range of keys for all the > state stores, every single time. > > Maybe we could flesh the motivation behind those extra methods, but in the > interest of time, and moving the KIP forward it might make sense to file a > follow-up once we have more concrete use-cases. > > On naming: > I also agree with Guozhang that "keys()" should be renamed. It feels a bit > of a misnomer, since it not only returns keys, but also the values. > > As far as what to rename it to, I would argue we already have some > discrepancy between key-value stores using range() vs. window stores using > fetch(). > I assume we called the window method "fetch" instead of "get" because you > might get back more than one window for the requested key. > > If we wanted to make things consistent with both existing key-value store > naming and window store naming, we could do the following: > Decide that "all" always refers to the entire range of keys, independent of > the window and similarly "range" always refers to a particular range of > keys, irrespective of the window. > We can then prefix methods with "fetch" to indicate that more than one > window may be returned for each key in the range. > > This would give us: > - a new fetchAll() method for all the keys, which makes it clear that you > might get back the same key in different windows > - a new fetchAll(timeFrom, timeTo) method to get all the keys in a given > time range, again with possibly more than one window per key > - and we'd have to rename fetch(K,K,long, long) to fetchRange(K, K, long, > long) and deprecate the old one to indicate a range of keys > > One inconsistency I noted: the "Proposed Changes" section in your KIP talks > about a "range(timeFrom, timeTo)" method, I think you meant to refer to the > all(from, to) method, but I'm sure you'll fix that once we decide on > naming. > > On the implementation side: > You mentioned that caching and rocksdb store have very different key/value > structures, and while it appears to be that way on the surface, the > structure between the two is actually very similar. Keys in the cache are > prefixed with a segment ID to ensure the ordering in the cache stays > consistent with the rocksdb implementation, which maintains multiple > rocksdb instances, one for each segment. So we just "artificially" mirror > the segment structure in the cache. > > The reason for keeping the ordering consistent is pretty simple: keep in > mind that when we query a cached window store we are effectively querying > both the cache and the persistent rocksdb store at the same time, merging > results from both. To make that merge as painless as possible, we ensure > the ordering is consistent when querying a range of keys in both stores. > > Also keep in mind CompositeReadonlyWindowStore, which wraps multiple window > stores within a topology. > > Hope this clarifies some of the less trivial parts of caching window store. > > Cheers, > Xavier > > On Sun, Oct 8, 2017 at 9:21 PM Guozhang Wang wrote: > > > Richard, Matthias: > > > > 0. Could you describe a bit what are the possible use cases of > `allLatest`, > > `minKey` and `maxKey`? I'd prefer keeping the APIs to add at a minimum > > necessary amount, to avoid a swamp of new APIs that no one would really > use > > but just complicated the internal code base. > > > > 1. One minor comment on the other two new APIs: could we rename `keys` to > > `all` and `all` to `range` to be consistent with the other store's APIs? > > > > 2. One meta comment on the implementation details: since both `keys` and > &
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
As Guozhang Wang mentioned earlier, we want to mirror the structure of similar Store class (namely KTable). The WindowedStore class might be unique in itself as it uses fetch() methods, but in my opinion, uniformity should be better suited for simplicity. On Mon, Oct 16, 2017 at 11:54 AM, Xavier Léauté wrote: > Thank you Richard! Do you or Guozhang have any thoughts on my suggestions > to use fetchAll() and fetchAll(timeFrom, timeTo) and reserve the "range" > keyword for when we query a specific range of keys? > > Xavier > > On Sat, Oct 14, 2017 at 2:32 PM Richard Yu > wrote: > > > Thanks for the clarifications, Xavier. > > I have removed most of the methods except for keys() and all() which has > > been renamed to Guozhang Wang's suggestions. > > > > Hope this helps. > > > > On Fri, Oct 13, 2017 at 3:28 PM, Xavier Léauté > > wrote: > > > > > Thanks for the KIP Richard, this is a very useful addition! > > > > > > As far as the API changes, I just have a few comments on the methods > that > > > don't seem directly related to the KIP title, and naming of course :). > > > On the implementation, see my notes further down that will hopefully > > > clarify a few things. > > > > > > Regarding the "bonus" methods: > > > I agree with Guozhang that the KIP lacks proper motivation for adding > the > > > min, max, and allLatest methods. > > > It is also not clear to me what min and max would really mean, what > > > ordering do we refer to here? Are we first ordering by time, then key, > or > > > first by key, then time? > > > The allLatest method might be useful, but I don't really see how it > would > > > be used in practice if we have to scan the entire range of keys for all > > the > > > state stores, every single time. > > > > > > Maybe we could flesh the motivation behind those extra methods, but in > > the > > > interest of time, and moving the KIP forward it might make sense to > file > > a > > > follow-up once we have more concrete use-cases. > > > > > > On naming: > > > I also agree with Guozhang that "keys()" should be renamed. It feels a > > bit > > > of a misnomer, since it not only returns keys, but also the values. > > > > > > As far as what to rename it to, I would argue we already have some > > > discrepancy between key-value stores using range() vs. window stores > > using > > > fetch(). > > > I assume we called the window method "fetch" instead of "get" because > you > > > might get back more than one window for the requested key. > > > > > > If we wanted to make things consistent with both existing key-value > store > > > naming and window store naming, we could do the following: > > > Decide that "all" always refers to the entire range of keys, > independent > > of > > > the window and similarly "range" always refers to a particular range of > > > keys, irrespective of the window. > > > We can then prefix methods with "fetch" to indicate that more than one > > > window may be returned for each key in the range. > > > > > > This would give us: > > > - a new fetchAll() method for all the keys, which makes it clear that > you > > > might get back the same key in different windows > > > - a new fetchAll(timeFrom, timeTo) method to get all the keys in a > given > > > time range, again with possibly more than one window per key > > > - and we'd have to rename fetch(K,K,long, long) to fetchRange(K, K, > long, > > > long) and deprecate the old one to indicate a range of keys > > > > > > One inconsistency I noted: the "Proposed Changes" section in your KIP > > talks > > > about a "range(timeFrom, timeTo)" method, I think you meant to refer to > > the > > > all(from, to) method, but I'm sure you'll fix that once we decide on > > > naming. > > > > > > On the implementation side: > > > You mentioned that caching and rocksdb store have very different > > key/value > > > structures, and while it appears to be that way on the surface, the > > > structure between the two is actually very similar. Keys in the cache > are > > > prefixed with a segment ID to ensure the ordering in the cache stays > > > consistent with the rocksdb implementation, which maintains multiple > > > rocksdb insta
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
Is this KIP close to completion? Because we could start working on the code itself now. (Its at about this stage). On Mon, Oct 16, 2017 at 7:37 PM, Richard Yu wrote: > As Guozhang Wang mentioned earlier, we want to mirror the structure of > similar Store class (namely KTable). The WindowedStore class might be > unique in itself as it uses fetch() methods, but in my opinion, uniformity > should be better suited for simplicity. > > On Mon, Oct 16, 2017 at 11:54 AM, Xavier Léauté > wrote: > >> Thank you Richard! Do you or Guozhang have any thoughts on my suggestions >> to use fetchAll() and fetchAll(timeFrom, timeTo) and reserve the "range" >> keyword for when we query a specific range of keys? >> >> Xavier >> >> On Sat, Oct 14, 2017 at 2:32 PM Richard Yu >> wrote: >> >> > Thanks for the clarifications, Xavier. >> > I have removed most of the methods except for keys() and all() which has >> > been renamed to Guozhang Wang's suggestions. >> > >> > Hope this helps. >> > >> > On Fri, Oct 13, 2017 at 3:28 PM, Xavier Léauté >> > wrote: >> > >> > > Thanks for the KIP Richard, this is a very useful addition! >> > > >> > > As far as the API changes, I just have a few comments on the methods >> that >> > > don't seem directly related to the KIP title, and naming of course :). >> > > On the implementation, see my notes further down that will hopefully >> > > clarify a few things. >> > > >> > > Regarding the "bonus" methods: >> > > I agree with Guozhang that the KIP lacks proper motivation for adding >> the >> > > min, max, and allLatest methods. >> > > It is also not clear to me what min and max would really mean, what >> > > ordering do we refer to here? Are we first ordering by time, then >> key, or >> > > first by key, then time? >> > > The allLatest method might be useful, but I don't really see how it >> would >> > > be used in practice if we have to scan the entire range of keys for >> all >> > the >> > > state stores, every single time. >> > > >> > > Maybe we could flesh the motivation behind those extra methods, but in >> > the >> > > interest of time, and moving the KIP forward it might make sense to >> file >> > a >> > > follow-up once we have more concrete use-cases. >> > > >> > > On naming: >> > > I also agree with Guozhang that "keys()" should be renamed. It feels a >> > bit >> > > of a misnomer, since it not only returns keys, but also the values. >> > > >> > > As far as what to rename it to, I would argue we already have some >> > > discrepancy between key-value stores using range() vs. window stores >> > using >> > > fetch(). >> > > I assume we called the window method "fetch" instead of "get" because >> you >> > > might get back more than one window for the requested key. >> > > >> > > If we wanted to make things consistent with both existing key-value >> store >> > > naming and window store naming, we could do the following: >> > > Decide that "all" always refers to the entire range of keys, >> independent >> > of >> > > the window and similarly "range" always refers to a particular range >> of >> > > keys, irrespective of the window. >> > > We can then prefix methods with "fetch" to indicate that more than one >> > > window may be returned for each key in the range. >> > > >> > > This would give us: >> > > - a new fetchAll() method for all the keys, which makes it clear that >> you >> > > might get back the same key in different windows >> > > - a new fetchAll(timeFrom, timeTo) method to get all the keys in a >> given >> > > time range, again with possibly more than one window per key >> > > - and we'd have to rename fetch(K,K,long, long) to fetchRange(K, K, >> long, >> > > long) and deprecate the old one to indicate a range of keys >> > > >> > > One inconsistency I noted: the "Proposed Changes" section in your KIP >> > talks >> > > about a "range(timeFrom, timeTo)" method, I think you meant to refer >> to >> > the >> > > all(from, to) method, but I'm sur
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
Soliciting more feedback before vote. On Wed, Oct 18, 2017 at 8:26 PM, Richard Yu wrote: > Is this KIP close to completion? Because we could start working on the > code itself now. (Its at about this stage). > > On Mon, Oct 16, 2017 at 7:37 PM, Richard Yu > wrote: > >> As Guozhang Wang mentioned earlier, we want to mirror the structure of >> similar Store class (namely KTable). The WindowedStore class might be >> unique in itself as it uses fetch() methods, but in my opinion, uniformity >> should be better suited for simplicity. >> >> On Mon, Oct 16, 2017 at 11:54 AM, Xavier Léauté >> wrote: >> >>> Thank you Richard! Do you or Guozhang have any thoughts on my suggestions >>> to use fetchAll() and fetchAll(timeFrom, timeTo) and reserve the "range" >>> keyword for when we query a specific range of keys? >>> >>> Xavier >>> >>> On Sat, Oct 14, 2017 at 2:32 PM Richard Yu >>> wrote: >>> >>> > Thanks for the clarifications, Xavier. >>> > I have removed most of the methods except for keys() and all() which >>> has >>> > been renamed to Guozhang Wang's suggestions. >>> > >>> > Hope this helps. >>> > >>> > On Fri, Oct 13, 2017 at 3:28 PM, Xavier Léauté >>> > wrote: >>> > >>> > > Thanks for the KIP Richard, this is a very useful addition! >>> > > >>> > > As far as the API changes, I just have a few comments on the methods >>> that >>> > > don't seem directly related to the KIP title, and naming of course >>> :). >>> > > On the implementation, see my notes further down that will hopefully >>> > > clarify a few things. >>> > > >>> > > Regarding the "bonus" methods: >>> > > I agree with Guozhang that the KIP lacks proper motivation for >>> adding the >>> > > min, max, and allLatest methods. >>> > > It is also not clear to me what min and max would really mean, what >>> > > ordering do we refer to here? Are we first ordering by time, then >>> key, or >>> > > first by key, then time? >>> > > The allLatest method might be useful, but I don't really see how it >>> would >>> > > be used in practice if we have to scan the entire range of keys for >>> all >>> > the >>> > > state stores, every single time. >>> > > >>> > > Maybe we could flesh the motivation behind those extra methods, but >>> in >>> > the >>> > > interest of time, and moving the KIP forward it might make sense to >>> file >>> > a >>> > > follow-up once we have more concrete use-cases. >>> > > >>> > > On naming: >>> > > I also agree with Guozhang that "keys()" should be renamed. It feels >>> a >>> > bit >>> > > of a misnomer, since it not only returns keys, but also the values. >>> > > >>> > > As far as what to rename it to, I would argue we already have some >>> > > discrepancy between key-value stores using range() vs. window stores >>> > using >>> > > fetch(). >>> > > I assume we called the window method "fetch" instead of "get" >>> because you >>> > > might get back more than one window for the requested key. >>> > > >>> > > If we wanted to make things consistent with both existing key-value >>> store >>> > > naming and window store naming, we could do the following: >>> > > Decide that "all" always refers to the entire range of keys, >>> independent >>> > of >>> > > the window and similarly "range" always refers to a particular range >>> of >>> > > keys, irrespective of the window. >>> > > We can then prefix methods with "fetch" to indicate that more than >>> one >>> > > window may be returned for each key in the range. >>> > > >>> > > This would give us: >>> > > - a new fetchAll() method for all the keys, which makes it clear >>> that you >>> > > might get back the same key in different windows >>> > > - a new fetchAll(timeFrom, timeTo) method to get all the keys in a >>> given >>> > > time range, again with possibly more than one wind
[VOTE] KIP-205: Add all() and range() API to ReadOnlyWindowStore
Hi all, I want to propose KIP-205 for the addition of new API. It is about adding methods similar to those found in ReadOnlyKeyValueStore to the ReadOnlyWindowStore class. As it appears the discussion has reached a conclusion, I would like to start the voting process. https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore Thanks for your patience!
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
I think we can come up with this compromise: range(long timeFrom, long timeTo) will be changed to getKeys(long timeFrom, long timeTo). Sounds fair? On Tue, Oct 24, 2017 at 10:44 AM, Xavier Léauté wrote: > > > > Generally I think having `all / range` is better in terms of consistency > > with key-value windows. I.e. queries with key are named as `get / fetch` > > for kv / window stores, and queries without key are named as `range / > all`. > > > > For kv stores, range takes a range of keys, and with this proposal range on > window stores would take a range of time, that does not sound consistent to > me at all. > > We also already have fetch which take both a range of time and keys. >
Re: [DISCUSS] KIP-205: Add getAllKeys() API to ReadOnlyWindowStore
Xavier: There has been two pluses on the voting thread. Are you fine with the current formation? On Tue, Oct 24, 2017 at 4:26 PM, Richard Yu wrote: > I think we can come up with this compromise: range(long timeFrom, long > timeTo) will be changed to getKeys(long timeFrom, long timeTo). Sounds fair? > > > On Tue, Oct 24, 2017 at 10:44 AM, Xavier Léauté > wrote: > >> > >> > Generally I think having `all / range` is better in terms of consistency >> > with key-value windows. I.e. queries with key are named as `get / fetch` >> > for kv / window stores, and queries without key are named as `range / >> all`. >> > >> >> For kv stores, range takes a range of keys, and with this proposal range >> on >> window stores would take a range of time, that does not sound consistent >> to >> me at all. >> >> We also already have fetch which take both a range of time and keys. >> > >
Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable
After investigation, I have found that the InternalStreamsBuilder#globalTable method is the only instance where the constructor for GlobalKTableImpl is called. The KTableValueGetterSupplier parameter used in this particular constructor is an instance of KTableSourceValueGetterSupplier. Hence, your requirement is satisfied. Since this is the vote thread, if you have further comments, please comment on the pull request. On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava wrote: > +1 binding > > The idea seems reasonable. Looking at it implementation-wise, seems there > is a bit of awkwardness because GlobalKTableImpl uses a > KTableValueGetterSupplier which seems to possibly have multiple stores, but > maybe using the more specific KTableSourceValueGetterSupplier > implementation instead can resolve that. > > -Ewen > > On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu wrote: > > > Gentle reminder: one more binding vote is needed for the KIP to pass. > > > > Cheers > > > > On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy > wrote: > > > > > +1 > > > > > > On Wed, 20 Dec 2017 at 21:09 Ted Yu wrote: > > > > > > > Ping for more (binding) votes. > > > > > > > > The pull request is ready. > > > > > > > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang > > > > wrote: > > > > > > > > > +1 (binding), thanks! > > > > > > > > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu > > wrote: > > > > > > > > > > > Hi, > > > > > > Here is the discussion thread: > > > > > > > > > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj= > > > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable > > > > > > > > > > > > Please vote on this KIP. > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >
Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable
A subsequent PR has already been created: https://github.com/apache/kafka/pull/4340/ It should be seen on the JIRA. On Tue, Jan 2, 2018 at 9:51 PM, Ewen Cheslack-Postava wrote: > Oh, the KIP passes w/ the required votes. My comment was just on > implementation details. I will leave comments about that up to the > subsequent PR and to the Kafka Streams folks that are much better suited > than me to comment on them :) > > -Ewen > > On Tue, Jan 2, 2018 at 9:28 PM, Richard Yu > wrote: > > > After investigation, I have found that the > > InternalStreamsBuilder#globalTable method is the only instance where the > > constructor for GlobalKTableImpl is called. > > The KTableValueGetterSupplier parameter used in this particular > constructor > > is an instance of KTableSourceValueGetterSupplier. Hence, your > requirement > > is satisfied. > > > > Since this is the vote thread, if you have further comments, please > comment > > on the pull request. > > > > On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava > > > wrote: > > > > > +1 binding > > > > > > The idea seems reasonable. Looking at it implementation-wise, seems > there > > > is a bit of awkwardness because GlobalKTableImpl uses a > > > KTableValueGetterSupplier which seems to possibly have multiple stores, > > but > > > maybe using the more specific KTableSourceValueGetterSupplier > > > implementation instead can resolve that. > > > > > > -Ewen > > > > > > On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu wrote: > > > > > > > Gentle reminder: one more binding vote is needed for the KIP to pass. > > > > > > > > Cheers > > > > > > > > On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy > > > wrote: > > > > > > > > > +1 > > > > > > > > > > On Wed, 20 Dec 2017 at 21:09 Ted Yu wrote: > > > > > > > > > > > Ping for more (binding) votes. > > > > > > > > > > > > The pull request is ready. > > > > > > > > > > > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > +1 (binding), thanks! > > > > > > > > > > > > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > Here is the discussion thread: > > > > > > > > > > > > > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj= > > > > > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable > > > > > > > > > > > > > > > > Please vote on this KIP. > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable
There are 3 binding votes and 2 nonbinding votes in favor of KIP-239. Binding: Guozhang Wang Damian Guy Ewen Cheslack-Postava Nonbinding: Matthias J. Sax Bill Bejeck This would close this voting thread. On Tue, Jan 2, 2018 at 10:24 PM, Matthias J. Sax wrote: > @Richard: you can close this vote thread with a summary as usual and > update the KIP wiki page accordingly. > > > -Matthias > > On 1/2/18 9:57 PM, Richard Yu wrote: > > A subsequent PR has already been created: > > https://github.com/apache/kafka/pull/4340/ > > It should be seen on the JIRA. > > > > > > > > On Tue, Jan 2, 2018 at 9:51 PM, Ewen Cheslack-Postava > > > wrote: > > > >> Oh, the KIP passes w/ the required votes. My comment was just on > >> implementation details. I will leave comments about that up to the > >> subsequent PR and to the Kafka Streams folks that are much better suited > >> than me to comment on them :) > >> > >> -Ewen > >> > >> On Tue, Jan 2, 2018 at 9:28 PM, Richard Yu > >> wrote: > >> > >>> After investigation, I have found that the > >>> InternalStreamsBuilder#globalTable method is the only instance where > the > >>> constructor for GlobalKTableImpl is called. > >>> The KTableValueGetterSupplier parameter used in this particular > >> constructor > >>> is an instance of KTableSourceValueGetterSupplier. Hence, your > >> requirement > >>> is satisfied. > >>> > >>> Since this is the vote thread, if you have further comments, please > >> comment > >>> on the pull request. > >>> > >>> On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava < > e...@confluent.io > >>> > >>> wrote: > >>> > >>>> +1 binding > >>>> > >>>> The idea seems reasonable. Looking at it implementation-wise, seems > >> there > >>>> is a bit of awkwardness because GlobalKTableImpl uses a > >>>> KTableValueGetterSupplier which seems to possibly have multiple > stores, > >>> but > >>>> maybe using the more specific KTableSourceValueGetterSupplier > >>>> implementation instead can resolve that. > >>>> > >>>> -Ewen > >>>> > >>>> On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu wrote: > >>>> > >>>>> Gentle reminder: one more binding vote is needed for the KIP to pass. > >>>>> > >>>>> Cheers > >>>>> > >>>>> On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy > >>>> wrote: > >>>>> > >>>>>> +1 > >>>>>> > >>>>>> On Wed, 20 Dec 2017 at 21:09 Ted Yu wrote: > >>>>>> > >>>>>>> Ping for more (binding) votes. > >>>>>>> > >>>>>>> The pull request is ready. > >>>>>>> > >>>>>>> On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang < > >>> wangg...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> +1 (binding), thanks! > >>>>>>>> > >>>>>>>> On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> Here is the discussion thread: > >>>>>>>>> > >>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj= > >>>>>>>>> Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable > >>>>>>>>> > >>>>>>>>> Please vote on this KIP. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > >
KIP-457: Add DISCONNECTED state to Kafka Streams
Hi all, I like to propose a small KIP on adding a new state to KafkaStreams#state(). It is very simple, so this should pass relatively quickly! Here is the discussion link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams Cheers, Richard
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Oh, I probably misunderstood the difference between DISCONNECTED and DEAD. I will update the KIP accordingly. Thanks for pointing that out! On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax wrote: > Thanks for the initiative. > > In the motivation you mention that you want to use DISCONNECT to > indicate that the application was killed. > > What is the difference to existing state DEAD? > > Also, the backing JIRA seems to have a different motivation to add a > DISCONNECT state. There, the Kafka Streams application itself is > healthy, but it cannot connect to the brokers. It seems reasonable to > add a DISCONNECT for this case though. > > > > -Matthias > > > > On 4/16/19 9:30 AM, Richard Yu wrote: > > Hi all, > > > > I like to propose a small KIP on adding a new state to > KafkaStreams#state(). > > It is very simple, so this should pass relatively quickly! > > Here is the discussion link: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > > > > Cheers, > > Richard > > > >
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Hi all, Considering that this is a simple KIP, I would probably start the voting tomorrow. I think it would be good if we could get this in fast. On Tue, Apr 16, 2019 at 3:31 PM Richard Yu wrote: > Oh, I probably misunderstood the difference between DISCONNECTED and DEAD. > I will update the KIP accordingly. > Thanks for pointing that out! > > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax > wrote: > >> Thanks for the initiative. >> >> In the motivation you mention that you want to use DISCONNECT to >> indicate that the application was killed. >> >> What is the difference to existing state DEAD? >> >> Also, the backing JIRA seems to have a different motivation to add a >> DISCONNECT state. There, the Kafka Streams application itself is >> healthy, but it cannot connect to the brokers. It seems reasonable to >> add a DISCONNECT for this case though. >> >> >> >> -Matthias >> >> >> >> On 4/16/19 9:30 AM, Richard Yu wrote: >> > Hi all, >> > >> > I like to propose a small KIP on adding a new state to >> KafkaStreams#state(). >> > It is very simple, so this should pass relatively quickly! >> > Here is the discussion link: >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >> > >> > Cheers, >> > Richard >> > >> >>
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Hi Micheal, Yeah, those are some points I should've clarified. No problem. Have got it done. On Wed, Apr 17, 2019 at 6:42 AM Michael Noll wrote: > Richard, > > thanks for looking into this! > > However, I have some concerns. The KIP you created ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > ) > doesn't yet address open questions such as the ones mentioned by Matthias: > > 1) What is the difference between DEAD and the proposed DISCONNECTED? This > should be defined in the KIP. > > 2) Difference between your KIP and the JIRA ( > https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket, the > DISCONNECTED state was proposed for the scenario that the KStreams > application is healthy but the Kafka broker is down. This is different to > what you wrote in the KIP: "When something happens in Kafka Streams, such > as an unexpected crash or error, KafkaStreams#state() will return > State.DISCONNECTED.", which seems to mean that DISCONNECTED should be the > state when the KStreams app is down. > > I wouldn't expect a KIP vote to pass if these basic questions aren't > properly sorted out in the KIP. > > Best, > Michael > > > > On Wed, Apr 17, 2019 at 3:35 AM Richard Yu > wrote: > > > Hi all, > > > > Considering that this is a simple KIP, I would probably start the voting > > tomorrow. > > I think it would be good if we could get this in fast. > > > > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu > > wrote: > > > > > Oh, I probably misunderstood the difference between DISCONNECTED and > > DEAD. > > > I will update the KIP accordingly. > > > Thanks for pointing that out! > > > > > > > > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax > > > > wrote: > > > > > >> Thanks for the initiative. > > >> > > >> In the motivation you mention that you want to use DISCONNECT to > > >> indicate that the application was killed. > > >> > > >> What is the difference to existing state DEAD? > > >> > > >> Also, the backing JIRA seems to have a different motivation to add a > > >> DISCONNECT state. There, the Kafka Streams application itself is > > >> healthy, but it cannot connect to the brokers. It seems reasonable to > > >> add a DISCONNECT for this case though. > > >> > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> On 4/16/19 9:30 AM, Richard Yu wrote: > > >> > Hi all, > > >> > > > >> > I like to propose a small KIP on adding a new state to > > >> KafkaStreams#state(). > > >> > It is very simple, so this should pass relatively quickly! > > >> > Here is the discussion link: > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > > >> > > > >> > Cheers, > > >> > Richard > > >> > > > >> > > >> > > >
[VOTE] KIP-457: Add DISCONNECTED status to Kafka Streams
Hi all, I would like to propose a minor change to the current KafkaStreams#state() method. Considering the small size of this proposal, I thought it would be good if we could pass it quickly. (It does not have large scale ramifications) Here is the KIP link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams Hope we could get this past! Cheers, Richard
Re: [VOTE] KIP-457: Add DISCONNECTED status to Kafka Streams
Sorry everybody, if you don't mind holding off voting for a second. Something came up, take a look at the discussion thread. - Richard On Wed, Apr 17, 2019 at 8:46 AM Richard Yu wrote: > Hi all, > > I would like to propose a minor change to the current KafkaStreams#state() > method. > Considering the small size of this proposal, I thought it would be good if > we could pass it quickly. (It does not have > large scale ramifications) > > Here is the KIP link: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > > Hope we could get this past! > > Cheers, > Richard >
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
I just realized something. Hi Matthias, might need your input here. I realized that when implementing this change, as noted in the JIRA, we would need to "check the behaviour of the consumer" since its consumer's connection with broker that we are dealing with. So doesn't that mean we would also be dealing with consumer API changes as well? I don't think consumer has any methods which would give us the state of a connection either. - Richard On Wed, Apr 17, 2019 at 8:43 AM Richard Yu wrote: > Hi Micheal, > > Yeah, those are some points I should've clarified. > No problem. Have got it done. > > > > On Wed, Apr 17, 2019 at 6:42 AM Michael Noll wrote: > >> Richard, >> >> thanks for looking into this! >> >> However, I have some concerns. The KIP you created ( >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >> ) >> doesn't yet address open questions such as the ones mentioned by Matthias: >> >> 1) What is the difference between DEAD and the proposed DISCONNECTED? >> This >> should be defined in the KIP. >> >> 2) Difference between your KIP and the JIRA ( >> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket, >> the >> DISCONNECTED state was proposed for the scenario that the KStreams >> application is healthy but the Kafka broker is down. This is different to >> what you wrote in the KIP: "When something happens in Kafka Streams, such >> as an unexpected crash or error, KafkaStreams#state() will return >> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be the >> state when the KStreams app is down. >> >> I wouldn't expect a KIP vote to pass if these basic questions aren't >> properly sorted out in the KIP. >> >> Best, >> Michael >> >> >> >> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu >> wrote: >> >> > Hi all, >> > >> > Considering that this is a simple KIP, I would probably start the voting >> > tomorrow. >> > I think it would be good if we could get this in fast. >> > >> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu >> > wrote: >> > >> > > Oh, I probably misunderstood the difference between DISCONNECTED and >> > DEAD. >> > > I will update the KIP accordingly. >> > > Thanks for pointing that out! >> > > >> > > >> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > >> Thanks for the initiative. >> > >> >> > >> In the motivation you mention that you want to use DISCONNECT to >> > >> indicate that the application was killed. >> > >> >> > >> What is the difference to existing state DEAD? >> > >> >> > >> Also, the backing JIRA seems to have a different motivation to add a >> > >> DISCONNECT state. There, the Kafka Streams application itself is >> > >> healthy, but it cannot connect to the brokers. It seems reasonable to >> > >> add a DISCONNECT for this case though. >> > >> >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> >> > >> >> > >> On 4/16/19 9:30 AM, Richard Yu wrote: >> > >> > Hi all, >> > >> > >> > >> > I like to propose a small KIP on adding a new state to >> > >> KafkaStreams#state(). >> > >> > It is very simple, so this should pass relatively quickly! >> > >> > Here is the discussion link: >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >> > >> > >> > >> > Cheers, >> > >> > Richard >> > >> > >> > >> >> > >> >> > >> >
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Alright, so I made a few changes to the KIP. I realized that there might be an easier way to give the user information on the connection state of Kafka Streams. In implementation, if one wishes to have DISCONNECTED as a state, then one would have to factor in proper state transitions. The other approach that is now outlined in the KIP. Instead, we could just add a method which I think achieves the same effect. If any of you thinks there is wrong with this approach, please let me know. :) Cheers, Richard On Wed, Apr 17, 2019 at 11:49 AM Richard Yu wrote: > I just realized something. > > Hi Matthias, might need your input here. > I realized that when implementing this change, as noted in the JIRA, we > would need to "check the behaviour of the consumer" since its consumer's > connection with broker that we are dealing with. > > So doesn't that mean we would also be dealing with consumer API changes as > well? > I don't think consumer has any methods which would give us the state of a > connection either. > > - Richard > > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu > wrote: > >> Hi Micheal, >> >> Yeah, those are some points I should've clarified. >> No problem. Have got it done. >> >> >> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll >> wrote: >> >>> Richard, >>> >>> thanks for looking into this! >>> >>> However, I have some concerns. The KIP you created ( >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >>> ) >>> doesn't yet address open questions such as the ones mentioned by >>> Matthias: >>> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED? >>> This >>> should be defined in the KIP. >>> >>> 2) Difference between your KIP and the JIRA ( >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket, >>> the >>> DISCONNECTED state was proposed for the scenario that the KStreams >>> application is healthy but the Kafka broker is down. This is different to >>> what you wrote in the KIP: "When something happens in Kafka Streams, such >>> as an unexpected crash or error, KafkaStreams#state() will return >>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be the >>> state when the KStreams app is down. >>> >>> I wouldn't expect a KIP vote to pass if these basic questions aren't >>> properly sorted out in the KIP. >>> >>> Best, >>> Michael >>> >>> >>> >>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu >>> wrote: >>> >>> > Hi all, >>> > >>> > Considering that this is a simple KIP, I would probably start the >>> voting >>> > tomorrow. >>> > I think it would be good if we could get this in fast. >>> > >>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu >> > >>> > wrote: >>> > >>> > > Oh, I probably misunderstood the difference between DISCONNECTED and >>> > DEAD. >>> > > I will update the KIP accordingly. >>> > > Thanks for pointing that out! >>> > > >>> > > >>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax < >>> matth...@confluent.io> >>> > > wrote: >>> > > >>> > >> Thanks for the initiative. >>> > >> >>> > >> In the motivation you mention that you want to use DISCONNECT to >>> > >> indicate that the application was killed. >>> > >> >>> > >> What is the difference to existing state DEAD? >>> > >> >>> > >> Also, the backing JIRA seems to have a different motivation to add a >>> > >> DISCONNECT state. There, the Kafka Streams application itself is >>> > >> healthy, but it cannot connect to the brokers. It seems reasonable >>> to >>> > >> add a DISCONNECT for this case though. >>> > >> >>> > >> >>> > >> >>> > >> -Matthias >>> > >> >>> > >> >>> > >> >>> > >> On 4/16/19 9:30 AM, Richard Yu wrote: >>> > >> > Hi all, >>> > >> > >>> > >> > I like to propose a small KIP on adding a new state to >>> > >> KafkaStreams#state(). >>> > >> > It is very simple, so this should pass relatively quickly! >>> > >> > Here is the discussion link: >>> > >> > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >>> > >> > >>> > >> > Cheers, >>> > >> > Richard >>> > >> > >>> > >> >>> > >> >>> > >>> >>
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Oh, so if possible. I thought it would be good if we could finish this KIP up. Matthias, or Michael, if you have any further comments, please let me know. :) Otherwise, I might restart the voting thread in a few days. Cheers, Richard On Wed, Apr 17, 2019 at 2:30 PM Richard Yu wrote: > Alright, so I made a few changes to the KIP. > I realized that there might be an easier way to give the user information > on the connection state of Kafka Streams. > In implementation, if one wishes to have DISCONNECTED as a state, then one > would have to factor in proper state transitions. > The other approach that is now outlined in the KIP. Instead, we could just > add a method which I think achieves the same effect. > If any of you thinks there is wrong with this approach, please let me > know. :) > > Cheers, > Richard > > On Wed, Apr 17, 2019 at 11:49 AM Richard Yu > wrote: > >> I just realized something. >> >> Hi Matthias, might need your input here. >> I realized that when implementing this change, as noted in the JIRA, we >> would need to "check the behaviour of the consumer" since its consumer's >> connection with broker that we are dealing with. >> >> So doesn't that mean we would also be dealing with consumer API changes >> as well? >> I don't think consumer has any methods which would give us the state of a >> connection either. >> >> - Richard >> >> On Wed, Apr 17, 2019 at 8:43 AM Richard Yu >> wrote: >> >>> Hi Micheal, >>> >>> Yeah, those are some points I should've clarified. >>> No problem. Have got it done. >>> >>> >>> >>> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll >>> wrote: >>> >>>> Richard, >>>> >>>> thanks for looking into this! >>>> >>>> However, I have some concerns. The KIP you created ( >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >>>> ) >>>> doesn't yet address open questions such as the ones mentioned by >>>> Matthias: >>>> >>>> 1) What is the difference between DEAD and the proposed DISCONNECTED? >>>> This >>>> should be defined in the KIP. >>>> >>>> 2) Difference between your KIP and the JIRA ( >>>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket, >>>> the >>>> DISCONNECTED state was proposed for the scenario that the KStreams >>>> application is healthy but the Kafka broker is down. This is different >>>> to >>>> what you wrote in the KIP: "When something happens in Kafka Streams, >>>> such >>>> as an unexpected crash or error, KafkaStreams#state() will return >>>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be >>>> the >>>> state when the KStreams app is down. >>>> >>>> I wouldn't expect a KIP vote to pass if these basic questions aren't >>>> properly sorted out in the KIP. >>>> >>>> Best, >>>> Michael >>>> >>>> >>>> >>>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu >>>> wrote: >>>> >>>> > Hi all, >>>> > >>>> > Considering that this is a simple KIP, I would probably start the >>>> voting >>>> > tomorrow. >>>> > I think it would be good if we could get this in fast. >>>> > >>>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu < >>>> yohan.richard...@gmail.com> >>>> > wrote: >>>> > >>>> > > Oh, I probably misunderstood the difference between DISCONNECTED and >>>> > DEAD. >>>> > > I will update the KIP accordingly. >>>> > > Thanks for pointing that out! >>>> > > >>>> > > >>>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax < >>>> matth...@confluent.io> >>>> > > wrote: >>>> > > >>>> > >> Thanks for the initiative. >>>> > >> >>>> > >> In the motivation you mention that you want to use DISCONNECT to >>>> > >> indicate that the application was killed. >>>> > >> >>>> > >> What is the difference to existing state DEAD? >>>> > >> >>>> > >> Also, the backing JIRA seems to have a different motivation to add >>>> a >>>> > >> DISCONNECT state. There, the Kafka Streams application itself is >>>> > >> healthy, but it cannot connect to the brokers. It seems reasonable >>>> to >>>> > >> add a DISCONNECT for this case though. >>>> > >> >>>> > >> >>>> > >> >>>> > >> -Matthias >>>> > >> >>>> > >> >>>> > >> >>>> > >> On 4/16/19 9:30 AM, Richard Yu wrote: >>>> > >> > Hi all, >>>> > >> > >>>> > >> > I like to propose a small KIP on adding a new state to >>>> > >> KafkaStreams#state(). >>>> > >> > It is very simple, so this should pass relatively quickly! >>>> > >> > Here is the discussion link: >>>> > >> > >>>> > >> >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >>>> > >> > >>>> > >> > Cheers, >>>> > >> > Richard >>>> > >> > >>>> > >> >>>> > >> >>>> > >>>> >>>
[DISCUSS] KIP-463: Auto-configure serdes passed alongside TopologyBuilder
Hi all, Due to issues that was discovered during the first attempt to implement a solution for the KAFKA-3729 ( https://issues.apache.org/jira/browse/KAFKA-3729), a KIP was thought to be necessary. There are a couple of alternatives by which we can proceed, so it would be good if we could discern the pros and cons of each approach. https://cwiki.apache.org/confluence/display/KAFKA/KIP-463%3A+Auto-configure+non-default+Serdes+passed+alongside+the+TopologyBuilder Hope this helps, Richard Yu
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Hi Matthias, Sure, I could do the DISCONNECTED state. On Sat, Apr 27, 2019 at 3:16 PM Matthias J. Sax wrote: > Thanks for updating the KIP. I also had a quick look into your PR. > > I actually think that the original idea to add a new state DISCONNECTED > would provide a better user experience. > > Your current proposal does not add a new state, even if it mentions this > in the beginning. Compare: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L74-L153 > > > -Matthias > > On 4/23/19 7:56 PM, Richard Yu wrote: > > Oh, so if possible. I thought it would be good if we could finish this > KIP > > up. > > Matthias, or Michael, if you have any further comments, please let me > know. > > :) > > > > Otherwise, I might restart the voting thread in a few days. > > > > Cheers, > > Richard > > > > On Wed, Apr 17, 2019 at 2:30 PM Richard Yu > > wrote: > > > >> Alright, so I made a few changes to the KIP. > >> I realized that there might be an easier way to give the user > information > >> on the connection state of Kafka Streams. > >> In implementation, if one wishes to have DISCONNECTED as a state, then > one > >> would have to factor in proper state transitions. > >> The other approach that is now outlined in the KIP. Instead, we could > just > >> add a method which I think achieves the same effect. > >> If any of you thinks there is wrong with this approach, please let me > >> know. :) > >> > >> Cheers, > >> Richard > >> > >> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu > > >> wrote: > >> > >>> I just realized something. > >>> > >>> Hi Matthias, might need your input here. > >>> I realized that when implementing this change, as noted in the JIRA, we > >>> would need to "check the behaviour of the consumer" since its > consumer's > >>> connection with broker that we are dealing with. > >>> > >>> So doesn't that mean we would also be dealing with consumer API changes > >>> as well? > >>> I don't think consumer has any methods which would give us the state > of a > >>> connection either. > >>> > >>> - Richard > >>> > >>> On Wed, Apr 17, 2019 at 8:43 AM Richard Yu > > >>> wrote: > >>> > >>>> Hi Micheal, > >>>> > >>>> Yeah, those are some points I should've clarified. > >>>> No problem. Have got it done. > >>>> > >>>> > >>>> > >>>> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll > >>>> wrote: > >>>> > >>>>> Richard, > >>>>> > >>>>> thanks for looking into this! > >>>>> > >>>>> However, I have some concerns. The KIP you created ( > >>>>> > >>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > >>>>> ) > >>>>> doesn't yet address open questions such as the ones mentioned by > >>>>> Matthias: > >>>>> > >>>>> 1) What is the difference between DEAD and the proposed DISCONNECTED? > >>>>> This > >>>>> should be defined in the KIP. > >>>>> > >>>>> 2) Difference between your KIP and the JIRA ( > >>>>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA > ticket, > >>>>> the > >>>>> DISCONNECTED state was proposed for the scenario that the KStreams > >>>>> application is healthy but the Kafka broker is down. This is > different > >>>>> to > >>>>> what you wrote in the KIP: "When something happens in Kafka Streams, > >>>>> such > >>>>> as an unexpected crash or error, KafkaStreams#state() will return > >>>>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be > >>>>> the > >>>>> state when the KStreams app is down. > >>>>> > >>>>> I wouldn't expect a KIP vote to pass if these basic questions aren't > >>>>> properly sorted out in the KIP. > >>>>> > >>>>> Best, > >>>>> Mich
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Alright, I made some changes. Matthias, if you had time, it would be good if you made another pass. This should be close to completion. Cheers, Richard On Sat, Apr 27, 2019 at 3:46 PM Richard Yu wrote: > Hi Matthias, > > Sure, I could do the DISCONNECTED state. > > > On Sat, Apr 27, 2019 at 3:16 PM Matthias J. Sax > wrote: > >> Thanks for updating the KIP. I also had a quick look into your PR. >> >> I actually think that the original idea to add a new state DISCONNECTED >> would provide a better user experience. >> >> Your current proposal does not add a new state, even if it mentions this >> in the beginning. Compare: >> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L74-L153 >> >> >> -Matthias >> >> On 4/23/19 7:56 PM, Richard Yu wrote: >> > Oh, so if possible. I thought it would be good if we could finish this >> KIP >> > up. >> > Matthias, or Michael, if you have any further comments, please let me >> know. >> > :) >> > >> > Otherwise, I might restart the voting thread in a few days. >> > >> > Cheers, >> > Richard >> > >> > On Wed, Apr 17, 2019 at 2:30 PM Richard Yu >> > wrote: >> > >> >> Alright, so I made a few changes to the KIP. >> >> I realized that there might be an easier way to give the user >> information >> >> on the connection state of Kafka Streams. >> >> In implementation, if one wishes to have DISCONNECTED as a state, then >> one >> >> would have to factor in proper state transitions. >> >> The other approach that is now outlined in the KIP. Instead, we could >> just >> >> add a method which I think achieves the same effect. >> >> If any of you thinks there is wrong with this approach, please let me >> >> know. :) >> >> >> >> Cheers, >> >> Richard >> >> >> >> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu < >> yohan.richard...@gmail.com> >> >> wrote: >> >> >> >>> I just realized something. >> >>> >> >>> Hi Matthias, might need your input here. >> >>> I realized that when implementing this change, as noted in the JIRA, >> we >> >>> would need to "check the behaviour of the consumer" since its >> consumer's >> >>> connection with broker that we are dealing with. >> >>> >> >>> So doesn't that mean we would also be dealing with consumer API >> changes >> >>> as well? >> >>> I don't think consumer has any methods which would give us the state >> of a >> >>> connection either. >> >>> >> >>> - Richard >> >>> >> >>> On Wed, Apr 17, 2019 at 8:43 AM Richard Yu < >> yohan.richard...@gmail.com> >> >>> wrote: >> >>> >> >>>> Hi Micheal, >> >>>> >> >>>> Yeah, those are some points I should've clarified. >> >>>> No problem. Have got it done. >> >>>> >> >>>> >> >>>> >> >>>> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll >> >>>> wrote: >> >>>> >> >>>>> Richard, >> >>>>> >> >>>>> thanks for looking into this! >> >>>>> >> >>>>> However, I have some concerns. The KIP you created ( >> >>>>> >> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams >> >>>>> ) >> >>>>> doesn't yet address open questions such as the ones mentioned by >> >>>>> Matthias: >> >>>>> >> >>>>> 1) What is the difference between DEAD and the proposed >> DISCONNECTED? >> >>>>> This >> >>>>> should be defined in the KIP. >> >>>>> >> >>>>> 2) Difference between your KIP and the JIRA ( >> >>>>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA >> ticket, >> >>>>> the >> >>>>> DISCONNECTED state was proposed for the scenario that the KStreams >> >>>>> application is healthy but the Kafka broker is down. This is &
[DISCUSS] KIP-472: Add header to RecordContext
Hello, I wish to introduce a minor addition present in RecordContext (a public facing API). This addition works to both provide the user with more information regarding the processing state of the partition, but also help resolve a bug which Kafka is currently experiencing. Here is the KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-472%3A+%5BSTREAMS%5D+Add+partition+time+field+to+RecordContext Cheers, Richard Yu
Re: [DISCUSS] KIP-472: Add header to RecordContext
Hi Matthias, Thanks for responding. :) I suppose from the scope of the change that is needed to fix the timestamp propagation bug is too complex (in other words, probably not worth it). So should this KIP be closed since it might be a little excessive? There probably is no need for too big of a change like this one. On Fri, May 31, 2019 at 3:17 PM Matthias J. Sax wrote: > Thanks for the KIP. > > However, I have some doubts that this would work. > > In the example, you mention 4 records > > > r1(2), r2(3), r3(7), and r4(9) > > and that if r3 and r4 would be filtered, the downstream task would not > advance partition time from 3 to 9. > > However, if r3 and r4 are filtered, no record will be sent downstream at > all -- hence, there is no record to which partition-time could be > piggy-bagged onto its header. > > When we sent r2, we don't know anything about r3 and r4 yet. And if > there is an r5 that is not filtered, r5 would advance partition time > anyway based on its own timestamp, hence no header is required. > > Also, I am not a big fan of adding headers in general, as they "leak" > internal implementation details. > > > Overall, my personal opinion is, that we should change Kafka's message > format and allow for "heartbeat" messages, that don't carry any data, > but only a timestamp. By default, those messages would not be exposed to > an application but would be considered "internal" similar to transaction > markers. However, changing the message format is a mayor change and > hence, I am not sure if it worth doing at all atm. > > > -Matthias > > > > On 5/20/19 7:20 PM, Richard Yu wrote: > > Hello, > > > > I wish to introduce a minor addition present in RecordContext (a public > > facing API). This addition works to both provide the user with > > more information regarding the processing state of the partition, but > also > > help resolve a bug which Kafka is currently experiencing. > > Here is the KIP Link: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-472%3A+%5BSTREAMS%5D+Add+partition+time+field+to+RecordContext > > > > Cheers, > > Richard Yu > > > >
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Hi Guozhang, Thanks for the input! Then I guess from the approach you have listed above, no API changes will be needed in Kafka consumer then. That will greatly simplify things, although when implementing these approaches, there might be some unexpected issues which might show up. Cheers,Richard On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang wrote: Hi Richard, Sorry for getting late on this, I've finally get some time to take a look at https://github.com/apache/kafka/pull/6594 as well as the KIP itself. Here are some thoughts: 1. The main motivation of this KIP is to be able to distinguish the case where a. "Streams client is in an unhealthy situation and hence cannot proceed" (which we have an ERROR state) and b. "Streams client is perfectly healthy, but it cannot get to the target brokers and hence cannot proceed", and this should also be distinguishable from c. "both Streams and brokers are healthy, there's just no data available for processing and hence cannot proceed"). And we want to have a way to notify the users about the second case b) distinguished from the others . 2. Following this, when I first thought about the solution I was thinking about adding a new state in the FSM of Kafka Streams, but after reviewing the code and the KIP, I felt this may be an overkill to complicate the FSM. Now I'm wondering if we can achieve the same thing with a single metric. For example: 2.a) we know that in Streams we always rely on consumer membership to allocate partitions to instances, which means that the heartbeat thread has to be working if the consumer wants to ever receive some data, what we can do is to let users monitor on this metric directly, e.g. if the heartbeat-rate drops to zero BUT the state is still in RUNNING it means we are in case b) above. 2.b) if we want to provide a streams-level metric out-of-the-box rather than letting users to monitor on consumer metrics, another idea is to leverage on existing "public Set assignment()" of KafkaConsumer, and record the time when it returns empty, meaning that nothing was assigned. And expose this as a boolean metric indicating nothing was assigned and hence we are likely in case b) above --- note this could also mean that we have fewer partitions than necessary so that some instance does not have any assignment indeed, which is not the same as b), but I feel consolidating these to cases with a single metric seem also fine. Guozhang On Wed, Apr 17, 2019 at 2:30 PM Richard Yu wrote: > Alright, so I made a few changes to the KIP. > I realized that there might be an easier way to give the user information > on the connection state of Kafka Streams. > In implementation, if one wishes to have DISCONNECTED as a state, then one > would have to factor in proper state transitions. > The other approach that is now outlined in the KIP. Instead, we could just > add a method which I think achieves the same effect. > If any of you thinks there is wrong with this approach, please let me know. > :) > > Cheers, > Richard > > On Wed, Apr 17, 2019 at 11:49 AM Richard Yu > wrote: > > > I just realized something. > > > > Hi Matthias, might need your input here. > > I realized that when implementing this change, as noted in the JIRA, we > > would need to "check the behaviour of the consumer" since its consumer's > > connection with broker that we are dealing with. > > > > So doesn't that mean we would also be dealing with consumer API changes > as > > well? > > I don't think consumer has any methods which would give us the state of a > > connection either. > > > > - Richard > > > > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu > > wrote: > > > >> Hi Micheal, > >> > >> Yeah, those are some points I should've clarified. > >> No problem. Have got it done. > >> > >> > >> > >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll > >> wrote: > >> > >>> Richard, > >>> > >>> thanks for looking into this! > >>> > >>> However, I have some concerns. The KIP you created ( > >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams > >>> ) > >>> doesn't yet address open questions such as the ones mentioned by > >>> Matthias: > >>> > >>> 1) What is the difference between DEAD and the proposed DISCONNECTED? > >>> This > >>> should be defined in the KIP. > >>> > >>> 2) Difference between your KIP and the JIRA ( > >>> https://issues.apache.org/jira/browse/KAFKA-6520
Re: KIP-457: Add DISCONNECTED state to Kafka Streams
Hi Matthias and Hachikuji, Sorry for getting back to you so late. Currently on a trip, so I hadn't got the time to respond. Currently, I'm not sure which approach we should do ATM, considering that Guozhang posed multiple possibilities in the previous email.Do you have any preferences as to which approach we should take? It would greatly help in the implementation of the issue. Cheers,Richard On Thursday, June 13, 2019, 4:55:29 PM GMT+8, Richard Yu wrote: Hi Guozhang, Thanks for the input! Then I guess from the approach you have listed above, no API changes will be needed in Kafka consumer then. That will greatly simplify things, although when implementing these approaches, there might be some unexpected issues which might show up. Cheers,Richard On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang wrote: Hi Richard, Sorry for getting late on this, I've finally get some time to take a look at https://github.com/apache/kafka/pull/6594 as well as the KIP itself. Here are some thoughts: 1. The main motivation of this KIP is to be able to distinguish the case where a. "Streams client is in an unhealthy situation and hence cannot proceed" (which we have an ERROR state) and b. "Streams client is perfectly healthy, but it cannot get to the target brokers and hence cannot proceed", and this should also be distinguishable from c. "both Streams and brokers are healthy, there's just no data available for processing and hence cannot proceed"). And we want to have a way to notify the users about the second case b) distinguished from the others . 2. Following this, when I first thought about the solution I was thinking about adding a new state in the FSM of Kafka Streams, but after reviewing the code and the KIP, I felt this may be an overkill to complicate the FSM. Now I'm wondering if we can achieve the same thing with a single metric. For example: 2.a) we know that in Streams we always rely on consumer membership to allocate partitions to instances, which means that the heartbeat thread has to be working if the consumer wants to ever receive some data, what we can do is to let users monitor on this metric directly, e.g. if the heartbeat-rate drops to zero BUT the state is still in RUNNING it means we are in case b) above. 2.b) if we want to provide a streams-level metric out-of-the-box rather than letting users to monitor on consumer metrics, another idea is to leverage on existing "public Set assignment()" of KafkaConsumer, and record the time when it returns empty, meaning that nothing was assigned. And expose this as a boolean metric indicating nothing was assigned and hence we are likely in case b) above --- note this could also mean that we have fewer partitions than necessary so that some instance does not have any assignment indeed, which is not the same as b), but I feel consolidating these to cases with a single metric seem also fine. Guozhang On Wed, Apr 17, 2019 at 2:30 PM Richard Yu wrote: > Alright, so I made a few changes to the KIP. > I realized that there might be an easier way to give the user information > on the connection state of Kafka Streams. > In implementation, if one wishes to have DISCONNECTED as a state, then one > would have to factor in proper state transitions. > The other approach that is now outlined in the KIP. Instead, we could just > add a method which I think achieves the same effect. > If any of you thinks there is wrong with this approach, please let me know. > :) > > Cheers, > Richard > > On Wed, Apr 17, 2019 at 11:49 AM Richard Yu > wrote: > > > I just realized something. > > > > Hi Matthias, might need your input here. > > I realized that when implementing this change, as noted in the JIRA, we > > would need to "check the behaviour of the consumer" since its consumer's > > connection with broker that we are dealing with. > > > > So doesn't that mean we would also be dealing with consumer API changes > as > > well? > > I don't think consumer has any methods which would give us the state of a > > connection either. > > > > - Richard > > > > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu > > wrote: > > > >> Hi Micheal, > >> > >> Yeah, those are some points I should've clarified. > >> No problem. Have got it done. > >> > >> > >> > >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll > >> wrote: > >> > >>> Richard, > >>> > >>> thanks for looking into this! > >>> > >>> However, I have some concerns. The KIP you created ( > >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+statu
Kip Write Access
Hello, I wish to write a kip. Could you grant me access? Thanks (Wiki username is yohan.richard.yu)
KIP-202
Hi, Please take a look at: https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream Thanks
[Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
Hi, Please take a look at: https://cwiki.apache.org/confluence/display/KAFKA/KIP- 202+Move+merge%28%29+from+StreamsBuilder+to+KStream Thanks
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
I added StreamsBuilder under the assumption that InternalStreamBuilder would be required to merge two streams. However, if that is not the case, then I would still need a couple of things: 1) An InternalStreamBuilder instance to instantiate a new KStream 2) The merge_name that the merged streams will be given 3) Need access to the corresponding InternalStreamBuilder's InternalTopologyBuilder to add a processor (for the new KStreams) All these parameters are associated with InternalStreamsBuilder, thus it is essential towards merging the streams. We are left with three options (taking into account the restriction that InternalStreamsBuilder's reference scope is mostly limited to within the org.apache.kafka.streams.kstream.internals package): a) Find a way to pass InternalStreamsBuilder indirectly into the class. (using StreamsBuilder) b) Find the matching InternalStreamBuilder within the method that corresponds to the streams about to be merged. or c) Use the local InternalStreamsBuilder inherited from AbstractStream, assuming that it is the correct builder >From your suggestion, that would mean using the c option I mentioned earlier. This choice of implementation works, but it could also include the risk that the local InternalStreamsBuilder might not be the correct one (just something one might want to keep in mind, since I will change it) On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax wrote: > Hi Richard, > > Thanks a lot for the KIP! > > I have three question: > - why is the new merge() method static? > - why does the new merge() method take StreamsBuilder as a parameter? > - did you think about Xavier's comment (see the JIRA in case you did > not notice it yet) about varargs vs adding some overloads to merge stream? > > My personal take is that merge() should not be static and not take > StreamsBuilder. The idea of the JIRA was to get a more natural API: > > // old > KStream merged = StreamsBuilder.merge(stream1, stream2); > // new > KStream merge = stream1.merge(stream2); > > > Having pointed out the second pattern, it should actually be fine to get > rid of varargs in merger() at all, as users could chain multiple calls > to merge() after each other: > > KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4); > > > > > -Matthias > > On 9/16/17 9:36 PM, Richard Yu wrote: > > Hi, > > Please take a look at: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream > > > > Thanks > > > >
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
KIP-202 has been changed according to the conditions of your suggestion. On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu wrote: > I added StreamsBuilder under the assumption that InternalStreamBuilder > would be required to merge > two streams. However, if that is not the case, then I would still need a > couple of things: > > 1) An InternalStreamBuilder instance to instantiate a new KStream > > 2) The merge_name that the merged streams will be given > > 3) Need access to the corresponding InternalStreamBuilder's > InternalTopologyBuilder to add a processor (for the new KStreams) > > All these parameters are associated with InternalStreamsBuilder, thus it > is essential towards merging the streams. > We are left with three options (taking into account the restriction that > InternalStreamsBuilder's reference scope is mostly limited to within the > org.apache.kafka.streams.kstream.internals package): > > a) Find a way to pass InternalStreamsBuilder indirectly into the class. > (using StreamsBuilder) > > b) Find the matching InternalStreamBuilder within the method that > corresponds to the streams about to be merged. > > or c) Use the local InternalStreamsBuilder inherited from AbstractStream, > assuming that it is the correct builder > > From your suggestion, that would mean using the c option I mentioned > earlier. This choice of implementation works, but it could also include the > risk that the local InternalStreamsBuilder might not be the correct one > (just something one might want to keep in mind, since I will change it) > > On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax > wrote: > >> Hi Richard, >> >> Thanks a lot for the KIP! >> >> I have three question: >> - why is the new merge() method static? >> - why does the new merge() method take StreamsBuilder as a parameter? >> - did you think about Xavier's comment (see the JIRA in case you did >> not notice it yet) about varargs vs adding some overloads to merge stream? >> >> My personal take is that merge() should not be static and not take >> StreamsBuilder. The idea of the JIRA was to get a more natural API: >> >> // old >> KStream merged = StreamsBuilder.merge(stream1, stream2); >> // new >> KStream merge = stream1.merge(stream2); >> >> >> Having pointed out the second pattern, it should actually be fine to get >> rid of varargs in merger() at all, as users could chain multiple calls >> to merge() after each other: >> >> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4); >> >> >> >> >> -Matthias >> >> On 9/16/17 9:36 PM, Richard Yu wrote: >> > Hi, >> > Please take a look at: >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream >> > >> > Thanks >> > >> >> >
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
With regards to Xavier's comment, this practice I do no think applies to this PR. There is not much potential here for warnings to be thrown. Note that in StreamsBuilder's merge, their is no @SuppressWarnings("unchecked")--indicating that warnings is sparse, if not nonexistent. On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu wrote: > KIP-202 has been changed according to the conditions of your suggestion. > > On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu > wrote: > >> I added StreamsBuilder under the assumption that InternalStreamBuilder >> would be required to merge >> two streams. However, if that is not the case, then I would still need a >> couple of things: >> >> 1) An InternalStreamBuilder instance to instantiate a new KStream >> >> 2) The merge_name that the merged streams will be given >> >> 3) Need access to the corresponding InternalStreamBuilder's >> InternalTopologyBuilder to add a processor (for the new KStreams) >> >> All these parameters are associated with InternalStreamsBuilder, thus it >> is essential towards merging the streams. >> We are left with three options (taking into account the restriction that >> InternalStreamsBuilder's reference scope is mostly limited to within the >> org.apache.kafka.streams.kstream.internals package): >> >> a) Find a way to pass InternalStreamsBuilder indirectly into the class. >> (using StreamsBuilder) >> >> b) Find the matching InternalStreamBuilder within the method that >> corresponds to the streams about to be merged. >> >> or c) Use the local InternalStreamsBuilder inherited from AbstractStream, >> assuming that it is the correct builder >> >> From your suggestion, that would mean using the c option I mentioned >> earlier. This choice of implementation works, but it could also include the >> risk that the local InternalStreamsBuilder might not be the correct one >> (just something one might want to keep in mind, since I will change it) >> >> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax >> wrote: >> >>> Hi Richard, >>> >>> Thanks a lot for the KIP! >>> >>> I have three question: >>> - why is the new merge() method static? >>> - why does the new merge() method take StreamsBuilder as a parameter? >>> - did you think about Xavier's comment (see the JIRA in case you did >>> not notice it yet) about varargs vs adding some overloads to merge >>> stream? >>> >>> My personal take is that merge() should not be static and not take >>> StreamsBuilder. The idea of the JIRA was to get a more natural API: >>> >>> // old >>> KStream merged = StreamsBuilder.merge(stream1, stream2); >>> // new >>> KStream merge = stream1.merge(stream2); >>> >>> >>> Having pointed out the second pattern, it should actually be fine to get >>> rid of varargs in merger() at all, as users could chain multiple calls >>> to merge() after each other: >>> >>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4); >>> >>> >>> >>> >>> -Matthias >>> >>> On 9/16/17 9:36 PM, Richard Yu wrote: >>> > Hi, >>> > Please take a look at: >>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream >>> > >>> > Thanks >>> > >>> >>> >> >
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
Correction: When the current merge() method is called with multiple streams, a warning will be printed (or logged), but this should not hinder ability to read the log. There is a missing unchecked warning suppression for the old method. However, it is not high priority due to deprecation of the old merge() method. On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu wrote: > With regards to Xavier's comment, this practice I do no think applies to > this PR. There is not much potential here for warnings to be thrown. Note > that in StreamsBuilder's merge, their is no > @SuppressWarnings("unchecked")--indicating > that warnings is sparse, if not nonexistent. > > > On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu > wrote: > >> KIP-202 has been changed according to the conditions of your suggestion. >> >> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu >> wrote: >> >>> I added StreamsBuilder under the assumption that InternalStreamBuilder >>> would be required to merge >>> two streams. However, if that is not the case, then I would still need a >>> couple of things: >>> >>> 1) An InternalStreamBuilder instance to instantiate a new KStream >>> >>> 2) The merge_name that the merged streams will be given >>> >>> 3) Need access to the corresponding InternalStreamBuilder's >>> InternalTopologyBuilder to add a processor (for the new KStreams) >>> >>> All these parameters are associated with InternalStreamsBuilder, thus it >>> is essential towards merging the streams. >>> We are left with three options (taking into account the restriction that >>> InternalStreamsBuilder's reference scope is mostly limited to within the >>> org.apache.kafka.streams.kstream.internals package): >>> >>> a) Find a way to pass InternalStreamsBuilder indirectly into the class. >>> (using StreamsBuilder) >>> >>> b) Find the matching InternalStreamBuilder within the method that >>> corresponds to the streams about to be merged. >>> >>> or c) Use the local InternalStreamsBuilder inherited from >>> AbstractStream, assuming that it is the correct builder >>> >>> From your suggestion, that would mean using the c option I mentioned >>> earlier. This choice of implementation works, but it could also include the >>> risk that the local InternalStreamsBuilder might not be the correct one >>> (just something one might want to keep in mind, since I will change it) >>> >>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax >> > wrote: >>> >>>> Hi Richard, >>>> >>>> Thanks a lot for the KIP! >>>> >>>> I have three question: >>>> - why is the new merge() method static? >>>> - why does the new merge() method take StreamsBuilder as a parameter? >>>> - did you think about Xavier's comment (see the JIRA in case you did >>>> not notice it yet) about varargs vs adding some overloads to merge >>>> stream? >>>> >>>> My personal take is that merge() should not be static and not take >>>> StreamsBuilder. The idea of the JIRA was to get a more natural API: >>>> >>>> // old >>>> KStream merged = StreamsBuilder.merge(stream1, stream2); >>>> // new >>>> KStream merge = stream1.merge(stream2); >>>> >>>> >>>> Having pointed out the second pattern, it should actually be fine to get >>>> rid of varargs in merger() at all, as users could chain multiple calls >>>> to merge() after each other: >>>> >>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4); >>>> >>>> >>>> >>>> >>>> -Matthias >>>> >>>> On 9/16/17 9:36 PM, Richard Yu wrote: >>>> > Hi, >>>> > Please take a look at: >>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream >>>> > >>>> > Thanks >>>> > >>>> >>>> >>> >> >
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
The discussion should not stay idle. Since this issue is so small, we should move it into the voting phase. On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax wrote: > Thanks for updating the KIP. > > You are of course right, that we internally need access to > InternalStreamBuilder, but that should not be too hard and effectively > be an internal implementation detail. > > > Two more comments: > > the new method should be > > > KStream merge(KStream stream); > > and not > > > KStream merge(KStream streams); > > as in the KIP? The prefix `` is not required for non-static methods > and it should be singular (not plural) as parameter name? > > Can you also add an explicit sentence, that the new method does not use > varargs anymore but a single KStream parameter (in contrast to the old > method). And mention that this is no limitation as calls to new merge() > can be chained. > > > > Thanks a lot! > > -Matthias > > > > On 9/17/17 10:32 AM, Richard Yu wrote: > > Correction: When the current merge() method is called with multiple > > streams, a warning will be printed (or logged), but this should not > hinder > > ability to read the log. > > There is a missing unchecked warning suppression for the old method. > > However, it is not high priority due to deprecation of the old merge() > > method. > > > > > > On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu > > wrote: > > > >> With regards to Xavier's comment, this practice I do no think applies to > >> this PR. There is not much potential here for warnings to be thrown. > Note > >> that in StreamsBuilder's merge, their is no > @SuppressWarnings("unchecked")--indicating > >> that warnings is sparse, if not nonexistent. > >> > >> > >> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu > > >> wrote: > >> > >>> KIP-202 has been changed according to the conditions of your > suggestion. > >>> > >>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu < > yohan.richard...@gmail.com> > >>> wrote: > >>> > >>>> I added StreamsBuilder under the assumption that InternalStreamBuilder > >>>> would be required to merge > >>>> two streams. However, if that is not the case, then I would still > need a > >>>> couple of things: > >>>> > >>>> 1) An InternalStreamBuilder instance to instantiate a new KStream > >>>> > >>>> 2) The merge_name that the merged streams will be given > >>>> > >>>> 3) Need access to the corresponding InternalStreamBuilder's > >>>> InternalTopologyBuilder to add a processor (for the new KStreams) > >>>> > >>>> All these parameters are associated with InternalStreamsBuilder, thus > it > >>>> is essential towards merging the streams. > >>>> We are left with three options (taking into account the restriction > that > >>>> InternalStreamsBuilder's reference scope is mostly limited to within > the > >>>> org.apache.kafka.streams.kstream.internals package): > >>>> > >>>> a) Find a way to pass InternalStreamsBuilder indirectly into the > class. > >>>> (using StreamsBuilder) > >>>> > >>>> b) Find the matching InternalStreamBuilder within the method that > >>>> corresponds to the streams about to be merged. > >>>> > >>>> or c) Use the local InternalStreamsBuilder inherited from > >>>> AbstractStream, assuming that it is the correct builder > >>>> > >>>> From your suggestion, that would mean using the c option I mentioned > >>>> earlier. This choice of implementation works, but it could also > include the > >>>> risk that the local InternalStreamsBuilder might not be the correct > one > >>>> (just something one might want to keep in mind, since I will change > it) > >>>> > >>>> On Sun, Sep 17, 2017 at 12:06 AM, Matthias J. Sax < > matth...@confluent.io > >>>>> wrote: > >>>> > >>>>> Hi Richard, > >>>>> > >>>>> Thanks a lot for the KIP! > >>>>> > >>>>> I have three question: > >>>>> - why is the new merge() method static? > >>>>> - why does the new merge() method take StreamsBuilder as a > parameter? > >>>>> - did you think about Xavier's comment (see the JIRA in case you did > >>>>> not notice it yet) about varargs vs adding some overloads to merge > >>>>> stream? > >>>>> > >>>>> My personal take is that merge() should not be static and not take > >>>>> StreamsBuilder. The idea of the JIRA was to get a more natural API: > >>>>> > >>>>> // old > >>>>> KStream merged = StreamsBuilder.merge(stream1, stream2); > >>>>> // new > >>>>> KStream merge = stream1.merge(stream2); > >>>>> > >>>>> > >>>>> Having pointed out the second pattern, it should actually be fine to > get > >>>>> rid of varargs in merger() at all, as users could chain multiple > calls > >>>>> to merge() after each other: > >>>>> > >>>>> KStream multiMerged = stream1.merge(s2).merge(s3).merge(s4); > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 9/16/17 9:36 PM, Richard Yu wrote: > >>>>>> Hi, > >>>>>> Please take a look at: > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > > > >
[VOTE] KIP-202
Hello, I would like to start a VOTE thread on KIP-202. Thanks.
Re: [VOTE] KIP-202
KIP-202 Move merge() from StreamsBuilder to KStream. https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream This is the link for the VOTE. On Mon, Sep 18, 2017 at 4:27 PM, Richard Yu wrote: > Hello, I would like to start a VOTE thread on KIP-202. > > Thanks. >
Re: [VOTE] KIP-202
Kip has been changed to suit 1.0.0 release. On Tue, Sep 19, 2017 at 6:24 AM, Damian Guy wrote: > +1 > > On Tue, 19 Sep 2017 at 14:15 Bill Bejeck wrote: > > > +1 > > > > -Bill > > > > On Tue, Sep 19, 2017 at 4:41 AM, Guozhang Wang > wrote: > > > > > Thanks for the KIP, +1. > > > > > > If we can make it in 1.0.0, I think we can just remove the merge() in > > > StreamsBuilder as it will only be introduced in 1.0.0; if we will add > it > > in > > > 1.1.0, then we indeed need to deprecate it. > > > > > > > > > Guozhang > > > > > > > > > On Tue, Sep 19, 2017 at 7:29 AM, Richard Yu < > yohan.richard...@gmail.com> > > > wrote: > > > > > > > KIP-202 Move merge() from StreamsBuilder to KStream. > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 202+Move+merge%28%29+from+StreamsBuilder+to+KStream > > > > > > > > This is the link for the VOTE. > > > > > > > > On Mon, Sep 18, 2017 at 4:27 PM, Richard Yu < > > yohan.richard...@gmail.com> > > > > wrote: > > > > > > > > > Hello, I would like to start a VOTE thread on KIP-202. > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >
Re: [VOTE] KIP-202
It is not possible, more than likely we are going to wait until after the release. On Tue, Sep 19, 2017 at 11:02 AM, Matthias J. Sax wrote: > +1 > > But I think we cannot get it into 1.0 as KIP vote deadline passed > already. Or is it possible to get an exception from this? > > > -Matthias > > On 9/19/17 7:09 AM, Richard Yu wrote: > > Kip has been changed to suit 1.0.0 release. > > > > On Tue, Sep 19, 2017 at 6:24 AM, Damian Guy > wrote: > > > >> +1 > >> > >> On Tue, 19 Sep 2017 at 14:15 Bill Bejeck wrote: > >> > >>> +1 > >>> > >>> -Bill > >>> > >>> On Tue, Sep 19, 2017 at 4:41 AM, Guozhang Wang > >> wrote: > >>> > >>>> Thanks for the KIP, +1. > >>>> > >>>> If we can make it in 1.0.0, I think we can just remove the merge() in > >>>> StreamsBuilder as it will only be introduced in 1.0.0; if we will add > >> it > >>> in > >>>> 1.1.0, then we indeed need to deprecate it. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Tue, Sep 19, 2017 at 7:29 AM, Richard Yu < > >> yohan.richard...@gmail.com> > >>>> wrote: > >>>> > >>>>> KIP-202 Move merge() from StreamsBuilder to KStream. > >>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>> 202+Move+merge%28%29+from+StreamsBuilder+to+KStream > >>>>> > >>>>> This is the link for the VOTE. > >>>>> > >>>>> On Mon, Sep 18, 2017 at 4:27 PM, Richard Yu < > >>> yohan.richard...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hello, I would like to start a VOTE thread on KIP-202. > >>>>>> > >>>>>> Thanks. > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > > > >
Re: [Discuss] KIP-202 Move merge() from StreamsBuilder to KStream
The PR should be ready. I have removed the old merge() method for 1.0.0. On Tue, Sep 19, 2017 at 4:22 PM, Guozhang Wang wrote: > I'd like to make an exception for this KIP if it's PR can get in before the > the code freeze date, as it's a low risk small KIP that is unlikely to > introduce regression. > > > Guozhang > > On Wed, Sep 20, 2017 at 2:01 AM, Matthias J. Sax > wrote: > > > @Damian, this KIP goes into 1.1 but not 1.0, so we need to go the > > deprecation way... > > > > I would be happy to get it into 1.0 and avoid the deprecation. But > > strictly speaking, the KIP vote deadline passed already... Not sure if > > there is any exception from this. > > > > > > -Matthias > > > > On 9/19/17 12:17 AM, Damian Guy wrote: > > > Hi Richard, > > > > > > Thanks for the KIP. Looks good, just one thing: we don't need to > > deprecate > > > StreamBuilder#merge as it has been added during this release cycle. It > > can > > > just be removed. > > > > > > Thanks, > > > Damian > > > > > > On Mon, 18 Sep 2017 at 23:22 Richard Yu > > wrote: > > > > > >> The discussion should not stay idle. Since this issue is so small, we > > >> should move it into the voting phase. > > >> > > >> On Sun, Sep 17, 2017 at 1:39 PM, Matthias J. Sax < > matth...@confluent.io > > > > > >> wrote: > > >> > > >>> Thanks for updating the KIP. > > >>> > > >>> You are of course right, that we internally need access to > > >>> InternalStreamBuilder, but that should not be too hard and > effectively > > >>> be an internal implementation detail. > > >>> > > >>> > > >>> Two more comments: > > >>> > > >>> the new method should be > > >>> > > >>>> KStream merge(KStream stream); > > >>> > > >>> and not > > >>> > > >>>> KStream merge(KStream streams); > > >>> > > >>> as in the KIP? The prefix `` is not required for non-static > > methods > > >>> and it should be singular (not plural) as parameter name? > > >>> > > >>> Can you also add an explicit sentence, that the new method does not > use > > >>> varargs anymore but a single KStream parameter (in contrast to the > old > > >>> method). And mention that this is no limitation as calls to new > merge() > > >>> can be chained. > > >>> > > >>> > > >>> > > >>> Thanks a lot! > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> > > >>> On 9/17/17 10:32 AM, Richard Yu wrote: > > >>>> Correction: When the current merge() method is called with multiple > > >>>> streams, a warning will be printed (or logged), but this should not > > >>> hinder > > >>>> ability to read the log. > > >>>> There is a missing unchecked warning suppression for the old method. > > >>>> However, it is not high priority due to deprecation of the old > merge() > > >>>> method. > > >>>> > > >>>> > > >>>> On Sun, Sep 17, 2017 at 9:37 AM, Richard Yu < > > >> yohan.richard...@gmail.com> > > >>>> wrote: > > >>>> > > >>>>> With regards to Xavier's comment, this practice I do no think > applies > > >> to > > >>>>> this PR. There is not much potential here for warnings to be > thrown. > > >>> Note > > >>>>> that in StreamsBuilder's merge, their is no > > >>> @SuppressWarnings("unchecked")--indicating > > >>>>> that warnings is sparse, if not nonexistent. > > >>>>> > > >>>>> > > >>>>> On Sun, Sep 17, 2017 at 9:10 AM, Richard Yu < > > >> yohan.richard...@gmail.com > > >>>> > > >>>>> wrote: > > >>>>> > > >>>>>> KIP-202 has been changed according to the conditions of your > > >>> suggestion. > > >>>>>> > > >>>>>> On Sun, Sep 17, 2017 at 8:51 AM, Richard Yu < > > >>> yohan.richard...@g
[DISCUSS] KIP-515: Reorganize checkpoint system in log cleaner to be per partition
Hi all, A KIP has been written that wishes to upgrade the checkpoint file system in log cleaner. If anybody wishes to comment, feel free to do so. :) https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Reorganize+checkpoint+file+system+in+log+cleaner+to+be+per+partition Above is the link for reference. Cheers, Richard
Re: [DISCUSS] KIP-515: Reorganize checkpoint system in log cleaner to be per partition
Hi Jun, Thanks for chipping in. :) The description you provided is pretty apt in describing the motivation of the KIP, so I will add it. I've made some changes to the KIP and outlined the basic approaches of what we have so far (basically changing the checkpoint file organization or incorporating an extra internal header field for a record). I will expand on them shortly. Any comments are appreciated! Cheers, Richard On Mon, Sep 9, 2019 at 3:10 PM Jun Rao wrote: > Hi, Richard, > > Thanks for drafting the KIP. A few comments below. > > 1. We need to provide a better motivation for the KIP. The goal of the KIP > is not to reorganize the checkpoint for log cleaning. It's just an > implementation detail. I was thinking that we could add sth like the > following in the Motivation/Problem section. > > "The idea of the configuration delete.retention.ms for compacted topics is > to prevent an application that has read a key to not see a subsequent > deletion of the key because it's physically removed too early. To solve > this problem, from the latest possible time (deleteHorizonMs) that an > application could have read a non tombstone key before a tombstone, we > preserve that tombstone for at least delete.retention.ms and require the > application to complete the reading of the tombstone by then. > > deleteHorizonMs is no later than the time when the cleaner has cleaned up > to the tombstone. After that time, no application can read a non-tombstone > key before the tombstone because they have all been cleaned away through > compaction. Since currently we don't explicitly store the time when a round > of cleaning completes, deleteHorizonMs is estimated by the last modified > time of the segment containing firstDirtyOffset. When merging multiple log > segments into a single one, the last modified time is inherited from the > last merged segment. So the last modified time of the newly merged segment > is actually not an accurate estimate of deleteHorizonMs. It could be > arbitrarily before (KAFKA-4545 <https://issues.apache.org/jira/browse/>) > or > after (KAFKA-8522 <https://issues.apache.org/jira/browse/KAFKA-8522>). The > former causes the tombstone to be deleted too early, which can cause an > application to miss the deletion of a key. The latter causes the tombstone > to be retained longer than needed and potentially forever." > > We probably want to change the title of the KIP accordingly. > > 2. The proposed implementation of the KIP is to remember the > firstDirtyOffset offset and the corresponding cleaning time in a checkpoint > file per partition and then use them to estimate deleteHorizonMs. It would > be useful to document the format of the new checkpoint file and how it will > be used during cleaning. Some examples will be helpful. > > 3. Thinking about this more. There is another way to solve this problem. We > could write the deleteHorizonMs for each tombstone as an internal header > field of the record (e.g., __deleteHorizonMs). That timestamp could be the > starting time of the log cleaner when the tombstone's offset is <= > firstDirtyOffset. We could use this timestamp to determine whether the > tombstone should be removed in subsequent rounds of cleaning. This way, we > can still keep the current per disk checkpoint file, which is more > efficient. Personally, I think this approach may be better. Could you > document this approach in the wiki as well so that we can discuss which one > to pick? > > > Jun > > > On Sun, Sep 1, 2019 at 7:45 PM Richard Yu > wrote: > > > Hi all, > > > > A KIP has been written that wishes to upgrade the checkpoint file system > in > > log cleaner. > > If anybody wishes to comment, feel free to do so. :) > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Reorganize+checkpoint+file+system+in+log+cleaner+to+be+per+partition > > Above is the link for reference. > > > > Cheers, > > Richard > > >
Re: [DISCUSS] KIP-515: Reorganize checkpoint system in log cleaner to be per partition
Hi Jason, That hadn't occurred to me. I think I missed your comment in the discussion, so I created this KIP only with resolving the problem regarding tombstones. Whats your thoughts? If the problem regarding transaction markers is a little too complex, then we can we just leave it out of the KIP and fix the tombstones issue. Cheers, Richard On Thu, Sep 19, 2019 at 8:47 AM Jason Gustafson wrote: > Hi Richard, > > Just reposting my comment from the JIRA: > > The underlying problem here also impacts the cleaning of transaction > markers. We use the same delete horizon in order to tell when it is safe to > remove the marker. If all the data from a transaction has been cleaned and > the delete horizon has passed enough time, then the marker is eligible for > deletion. > > However, I don't think the same approach that we're proposing to fix the > problem for tombstones will work transaction markers. What we need to track > is the timestamp when all the records from a transaction have been removed. > That is when we start the timer for deletion. But this would be different > for every transaction and there is no guarantee that earlier transactions > will be eligible for deletion before later ones. It all depends on the keys > written in the transaction. I don't see an obvious way to solve this > problem without some record-level bookkeeping, but I might be missing > something. > > Thanks, > Jason > > On Mon, Sep 9, 2019 at 7:21 PM Richard Yu > wrote: > > > Hi Jun, > > > > Thanks for chipping in. :) > > > > The description you provided is pretty apt in describing the motivation > of > > the KIP, so I will add it. I've made some changes to the KIP and outlined > > the basic approaches of what we have so far (basically changing the > > checkpoint file organization or incorporating an extra internal header > > field for a record). I will expand on them shortly. > > > > Any comments are appreciated! > > > > Cheers, > > Richard > > > > On Mon, Sep 9, 2019 at 3:10 PM Jun Rao wrote: > > > > > Hi, Richard, > > > > > > Thanks for drafting the KIP. A few comments below. > > > > > > 1. We need to provide a better motivation for the KIP. The goal of the > > KIP > > > is not to reorganize the checkpoint for log cleaning. It's just an > > > implementation detail. I was thinking that we could add sth like the > > > following in the Motivation/Problem section. > > > > > > "The idea of the configuration delete.retention.ms for compacted > topics > > is > > > to prevent an application that has read a key to not see a subsequent > > > deletion of the key because it's physically removed too early. To solve > > > this problem, from the latest possible time (deleteHorizonMs) that an > > > application could have read a non tombstone key before a tombstone, we > > > preserve that tombstone for at least delete.retention.ms and require > the > > > application to complete the reading of the tombstone by then. > > > > > > deleteHorizonMs is no later than the time when the cleaner has cleaned > up > > > to the tombstone. After that time, no application can read a > > non-tombstone > > > key before the tombstone because they have all been cleaned away > through > > > compaction. Since currently we don't explicitly store the time when a > > round > > > of cleaning completes, deleteHorizonMs is estimated by the last > modified > > > time of the segment containing firstDirtyOffset. When merging multiple > > log > > > segments into a single one, the last modified time is inherited from > the > > > last merged segment. So the last modified time of the newly merged > > segment > > > is actually not an accurate estimate of deleteHorizonMs. It could be > > > arbitrarily before (KAFKA-4545 <https://issues.apache.org/jira/browse/ > >) > > > or > > > after (KAFKA-8522 <https://issues.apache.org/jira/browse/KAFKA-8522>). > > The > > > former causes the tombstone to be deleted too early, which can cause an > > > application to miss the deletion of a key. The latter causes the > > tombstone > > > to be retained longer than needed and potentially forever." > > > > > > We probably want to change the title of the KIP accordingly. > > > > > > 2. The proposed implementation of the KIP is to remember the > > > firstDirtyOffset offset and the corresponding cleaning time in a > > checkpoint > > > file per
Re: [DISCUSS] KIP-515: Reorganize checkpoint system in log cleaner to be per partition
Hi Jason, That actually sounds like a pretty good idea to me. No doubt if we use this approach, then some comments need to be added that indicates this. But all things considered, I think its not bad at all. I definitely agree with you on that its a little hacky, but it works. Cheers, Richard On Tue, Sep 24, 2019 at 10:44 AM Jason Gustafson wrote: > Hi Richard, > > It would be unsatisfying to make a big change to the checkpointing logic in > order to handle only one case of this problem, right? > > I did have one idea about how to do this. It's a bit of a hack, but keep an > open mind ;). The basic problem is having somewhere to embed the delete > horizon for each batch. In the v2 format, each batch header contains two > timestamps: the base timestamp and the max timestamp. Each record in the > batch contains a timestamp delta which is relative to the base timestamp. > In other words, to get the record timestamp, you add the record delta to > the base timestamp. > > Typically there is no reason for the base timestamp to be different from > the timestamp of the first message, but this is not a strict requirement. > As long as you can get to the record timestamp by adding the base timestamp > and delta, then we are good. So the idea is to set the base timestamp to > the delete horizon and adjust the deltas accordingly. We could then use one > bit from the batch attributes to indicate when the base timestamp had been > set to the delete horizon. There would be no change to the batch max > timestamp, so indexing would not be affected by this change. > > So the logic would look something like this when cleaning the log. > > Case 1: Normal batch > > a. If delete horizon flag is set, then retain tombstones as long as the > current time is before the horizon. > b. If no delete horizon is set, then retain tombstones and set the delete > horizon in the cleaned batch to current time + > log.cleaner.delete.retention.ms. > > Case 2: Control batch > > a. If delete horizon flag is set, then retain the batch and the marker > as long as the current time is before the horizon. > b. If no delete horizon is set and there are no records remaining from the > transaction, then retain the marker and set the delete horizon in the > cleaned batch to current time + log.cleaner.delete.retention.ms. > > What do you think? > > -Jason > > > > On Thu, Sep 19, 2019 at 3:21 PM Richard Yu > wrote: > > > Hi Jason, > > > > That hadn't occurred to me. > > > > I think I missed your comment in the discussion, so I created this KIP > only > > with resolving the problem regarding tombstones. > > Whats your thoughts? If the problem regarding transaction markers is a > > little too complex, then we can we just leave it out of the KIP and fix > the > > tombstones issue. > > > > Cheers, > > Richard > > > > On Thu, Sep 19, 2019 at 8:47 AM Jason Gustafson > > wrote: > > > > > Hi Richard, > > > > > > Just reposting my comment from the JIRA: > > > > > > The underlying problem here also impacts the cleaning of transaction > > > markers. We use the same delete horizon in order to tell when it is > safe > > to > > > remove the marker. If all the data from a transaction has been cleaned > > and > > > the delete horizon has passed enough time, then the marker is eligible > > for > > > deletion. > > > > > > However, I don't think the same approach that we're proposing to fix > the > > > problem for tombstones will work transaction markers. What we need to > > track > > > is the timestamp when all the records from a transaction have been > > removed. > > > That is when we start the timer for deletion. But this would be > different > > > for every transaction and there is no guarantee that earlier > transactions > > > will be eligible for deletion before later ones. It all depends on the > > keys > > > written in the transaction. I don't see an obvious way to solve this > > > problem without some record-level bookkeeping, but I might be missing > > > something. > > > > > > Thanks, > > > Jason > > > > > > On Mon, Sep 9, 2019 at 7:21 PM Richard Yu > > > wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks for chipping in. :) > > > > > > > > The description you provided is pretty apt in describing the > motivation > > > of > > > > the KIP, so I will add it. I've made some changes to the KIP and > > outlined
Re: [DISCUSS] KIP-515: Reorganize checkpoint system in log cleaner to be per partition
Hi Jun, Jason, I've updated the KIP accordingly. Sorry for taking a while to get back to you guys. We should've ironed out the general approach fairly well now. So if there isn't any other comments, I will get the vote started then. :) Cheers, Richard On Thu, Oct 10, 2019 at 3:41 PM Jun Rao wrote: > Hi, Jason, > > I agree that your approach is better since it's more general, more accurate > and simpler. The only thing is that it may not work for the old message > format. I am not sure how important it is since most users are probably > already on the new message format. Perhaps we can just document this > limitation. > > Hi, Richard, > > Could you update the KIP with Jason's approach? Also, it seems that KIP-515 > is already taken by another KIP. Could you use a new KIP number for this? > > Thanks, > > Jun > > On Fri, Sep 27, 2019 at 3:59 PM Richard Yu > wrote: > > > Hi Jason, > > > > That actually sounds like a pretty good idea to me. No doubt if we use > this > > approach, then some comments need to be added that indicates this. > > But all things considered, I think its not bad at all. > > > > I definitely agree with you on that its a little hacky, but it works. > > > > Cheers, > > Richard > > > > On Tue, Sep 24, 2019 at 10:44 AM Jason Gustafson > > wrote: > > > > > Hi Richard, > > > > > > It would be unsatisfying to make a big change to the checkpointing > logic > > in > > > order to handle only one case of this problem, right? > > > > > > I did have one idea about how to do this. It's a bit of a hack, but > keep > > an > > > open mind ;). The basic problem is having somewhere to embed the delete > > > horizon for each batch. In the v2 format, each batch header contains > two > > > timestamps: the base timestamp and the max timestamp. Each record in > the > > > batch contains a timestamp delta which is relative to the base > timestamp. > > > In other words, to get the record timestamp, you add the record delta > to > > > the base timestamp. > > > > > > Typically there is no reason for the base timestamp to be different > from > > > the timestamp of the first message, but this is not a strict > requirement. > > > As long as you can get to the record timestamp by adding the base > > timestamp > > > and delta, then we are good. So the idea is to set the base timestamp > to > > > the delete horizon and adjust the deltas accordingly. We could then use > > one > > > bit from the batch attributes to indicate when the base timestamp had > > been > > > set to the delete horizon. There would be no change to the batch max > > > timestamp, so indexing would not be affected by this change. > > > > > > So the logic would look something like this when cleaning the log. > > > > > > Case 1: Normal batch > > > > > > a. If delete horizon flag is set, then retain tombstones as long as the > > > current time is before the horizon. > > > b. If no delete horizon is set, then retain tombstones and set the > delete > > > horizon in the cleaned batch to current time + > > > log.cleaner.delete.retention.ms. > > > > > > Case 2: Control batch > > > > > > a. If delete horizon flag is set, then retain the batch and the marker > > > as long as the current time is before the horizon. > > > b. If no delete horizon is set and there are no records remaining from > > the > > > transaction, then retain the marker and set the delete horizon in the > > > cleaned batch to current time + log.cleaner.delete.retention.ms. > > > > > > What do you think? > > > > > > -Jason > > > > > > > > > > > > On Thu, Sep 19, 2019 at 3:21 PM Richard Yu > > > > wrote: > > > > > > > Hi Jason, > > > > > > > > That hadn't occurred to me. > > > > > > > > I think I missed your comment in the discussion, so I created this > KIP > > > only > > > > with resolving the problem regarding tombstones. > > > > Whats your thoughts? If the problem regarding transaction markers is > a > > > > little too complex, then we can we just leave it out of the KIP and > fix > > > the > > > > tombstones issue. > > > > > > > > Cheers, > > > > Richard > > > > > > > > On Thu, Sep 19, 2019 at 8:47 AM Jason
[VOTE] KIP-534: Retain tombstones for approximately delete.retention.ms milliseconds
Hi all, The discussion for KIP-534 seems to have concluded. So I wish to vote this in so that we can get it done. Its a small bug fix. :) Below is the KIP link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+for+approximately+delete.retention.ms+milliseconds Cheers, Richard
Re: [VOTE] KIP-534: Retain tombstones for approximately delete.retention.ms milliseconds
Hi Jun, I've updated the link accordingly. :) Here is the updated KIP link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds Cheers, Richard On Mon, Oct 14, 2019 at 5:12 PM Jun Rao wrote: > Hi, Richard, > > Thanks for the KIP. Looks good to me overall. A few minor comments below. > > 1. Could you change the title from "Retain tombstones" to "Retain > tombstones and transaction markers" to make it more general? > > 2. Could you document which bit in the batch attribute will be used for the > new flag? The current format of the batch attribute is the following. > > * > - > * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type > (3) | Compression Type (0-2) | > > * > - > > 3. Could you provide the reasons for the rejected proposals? For > proposal 1, one reason is that it doesn't cover the transaction > markers. For proposal 2, one reason is that the interval record header > could be exposed to the clients. > > > Jun > > > On Mon, Oct 14, 2019 at 4:42 PM Richard Yu > wrote: > > > Hi all, > > > > The discussion for KIP-534 seems to have concluded. > > So I wish to vote this in so that we can get it done. Its a small bug > fix. > > :) > > > > Below is the KIP link: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+for+approximately+delete.retention.ms+milliseconds > > > > Cheers, > > Richard > > >
Re: [VOTE] KIP-534: Retain tombstones for approximately delete.retention.ms milliseconds
Hi all, Want to try to get this KIP wrapped up. So it would be great if we can get some votes. Cheers, Richard On Tue, Oct 15, 2019 at 12:58 PM Jun Rao wrote: > Hi, Richard, > > Thanks for the updated KIP. +1 from me. > > Jun > > On Tue, Oct 15, 2019 at 12:46 PM Richard Yu > wrote: > > > Hi Jun, > > > > I've updated the link accordingly. :) > > Here is the updated KIP link: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds > > > > Cheers, > > Richard > > > > On Mon, Oct 14, 2019 at 5:12 PM Jun Rao wrote: > > > > > Hi, Richard, > > > > > > Thanks for the KIP. Looks good to me overall. A few minor comments > below. > > > > > > 1. Could you change the title from "Retain tombstones" to "Retain > > > tombstones and transaction markers" to make it more general? > > > > > > 2. Could you document which bit in the batch attribute will be used for > > the > > > new flag? The current format of the batch attribute is the following. > > > > > > * > > > > > > - > > > * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type > > > (3) | Compression Type (0-2) | > > > > > > * > > > > > > - > > > > > > 3. Could you provide the reasons for the rejected proposals? For > > > proposal 1, one reason is that it doesn't cover the transaction > > > markers. For proposal 2, one reason is that the interval record header > > > could be exposed to the clients. > > > > > > > > > Jun > > > > > > > > > On Mon, Oct 14, 2019 at 4:42 PM Richard Yu > > > > wrote: > > > > > > > Hi all, > > > > > > > > The discussion for KIP-534 seems to have concluded. > > > > So I wish to vote this in so that we can get it done. Its a small bug > > > fix. > > > > :) > > > > > > > > Below is the KIP link: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+for+approximately+delete.retention.ms+milliseconds > > > > > > > > Cheers, > > > > Richard > > > > > > > > > >
Re: [VOTE] KIP-534: Retain tombstones for approximately delete.retention.ms milliseconds
Hi Guozhang, Your understanding basically is on point. I haven't looked into the details for what happens if we change the base timestamp and how its calculated, so I'm not clear on how small the delta (or big) is. To be fair, would the the delta size pose a big problem if it takes up more bytes to encode? Cheers, Richard On Wed, Oct 16, 2019 at 7:36 PM Guozhang Wang wrote: > Hello Richard, > > Thanks for the KIP, I just have one clarification regarding "So the idea is > to set the base timestamp to the delete horizon and adjust the deltas > accordingly." My understanding is that during compaction, for each > compacted new segment, we would set its base offset of each batch as the > delete horizon, which is the "current system time that cleaner has seen so > far", and adjust the delta timestamps of each of the inner records of the > batch (and practically the deltas will be all negative)? > > If that's case, could we do some back of the envelope calculation on what's > the possible smallest case of deltas? Note that since we use varInt for > delta values for each record, the smaller the negative delta, that would > take more bytes to encode. > > Guozhang > > On Wed, Oct 16, 2019 at 6:48 PM Jason Gustafson > wrote: > > > +1. Thanks Richard. > > > > On Wed, Oct 16, 2019 at 10:04 AM Richard Yu > > wrote: > > > > > Hi all, > > > > > > Want to try to get this KIP wrapped up. So it would be great if we can > > get > > > some votes. > > > > > > Cheers, > > > Richard > > > > > > On Tue, Oct 15, 2019 at 12:58 PM Jun Rao wrote: > > > > > > > Hi, Richard, > > > > > > > > Thanks for the updated KIP. +1 from me. > > > > > > > > Jun > > > > > > > > On Tue, Oct 15, 2019 at 12:46 PM Richard Yu < > > yohan.richard...@gmail.com> > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > I've updated the link accordingly. :) > > > > > Here is the updated KIP link: > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds > > > > > > > > > > Cheers, > > > > > Richard > > > > > > > > > > On Mon, Oct 14, 2019 at 5:12 PM Jun Rao wrote: > > > > > > > > > > > Hi, Richard, > > > > > > > > > > > > Thanks for the KIP. Looks good to me overall. A few minor > comments > > > > below. > > > > > > > > > > > > 1. Could you change the title from "Retain tombstones" to "Retain > > > > > > tombstones and transaction markers" to make it more general? > > > > > > > > > > > > 2. Could you document which bit in the batch attribute will be > used > > > for > > > > > the > > > > > > new flag? The current format of the batch attribute is the > > following. > > > > > > > > > > > > * > > > > > > > > > > > > > > > > > > > > > - > > > > > > * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp > > Type > > > > > > (3) | Compression Type (0-2) | > > > > > > > > > > > > * > > > > > > > > > > > > > > > > > > > > > - > > > > > > > > > > > > 3. Could you provide the reasons for the rejected proposals? For > > > > > > proposal 1, one reason is that it doesn't cover the transaction > > > > > > markers. For proposal 2, one reason is that the interval record > > > header > > > > > > could be exposed to the clients. > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Oct 14, 2019 at 4:42 PM Richard Yu < > > > yohan.richard...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > The discussion for KIP-534 seems to have concluded. > > > > > > > So I wish to vote this in so that we can get it done. Its a > small > > > bug > > > > > > fix. > > > > > > > :) > > > > > > > > > > > > > > Below is the KIP link: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+for+approximately+delete.retention.ms+milliseconds > > > > > > > > > > > > > > Cheers, > > > > > > > Richard > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >
Re: [VOTE] KIP-534: Retain tombstones for approximately delete.retention.ms milliseconds
Hi Guozhang, Jason, I've updated the KIP to include this warning as well. If there is anything else that we need for it, let me know. :) Otherwise, we should vote this KIP in. Cheers, Richard On Thu, Oct 17, 2019 at 10:41 AM Jason Gustafson wrote: > Hi Guozhang, > > It's a fair point. For control records, I think it's a non-issue since they > are tiny and not batched. So the only case where this might matter is large > batch deletions. I think the size difference is not a major issue itself, > but I think it's worth mentioning in the KIP the risk of exceeding the max > message size. I think the code should probably make this more of a soft > limit when cleaning. We have run into scenarios in the past as well where > recompression has actually increased message size. We may also want to be > able to upconvert messages to the new format in the future in the cleaner. > > -Jason > > > > On Thu, Oct 17, 2019 at 9:08 AM Guozhang Wang wrote: > > > Here's my understanding: when log compaction kicks in, the system time at > > the moment would be larger than the message timestamp to be compacted, so > > the modification on the batch timestamp would practically be increasing > its > > value, and hence the deltas for each inner message would be negative to > > maintain their actual timestamp. Depending on the time diff between the > > actual timestamp of the message and the time when log compaction happens, > > this negative delta can be large or small since it not long depends on > the > > cleaner thread wakeup frequency but also dirty ratio etc. > > > > With varInt encoding, the num.bytes needed for encode an int varies from > 1 > > to 5 bytes; before compaction, the deltas should be relatively small > > positive values compared with the base timestamp, and hence most likely 1 > > or 2 bytes needed to encode, after compaction, the deltas could be > > relatively large negative values that may take more bytes to encode. > With a > > record batch of 512 in practice, and suppose after compaction each record > > would take 2 more byte for encoding deltas, that would be 1K more per > > batch. Usually it would not be too big of an issue with reasonable sized > > message, but I just wanted to point out this as a potential regression. > > > > > > Guozhang > > > > On Wed, Oct 16, 2019 at 9:36 PM Richard Yu > > wrote: > > > > > Hi Guozhang, > > > > > > Your understanding basically is on point. > > > > > > I haven't looked into the details for what happens if we change the > base > > > timestamp and how its calculated, so I'm not clear on how small the > delta > > > (or big) is. > > > To be fair, would the the delta size pose a big problem if it takes up > > more > > > bytes to encode? > > > > > > Cheers, > > > Richard > > > > > > On Wed, Oct 16, 2019 at 7:36 PM Guozhang Wang > > wrote: > > > > > > > Hello Richard, > > > > > > > > Thanks for the KIP, I just have one clarification regarding "So the > > idea > > > is > > > > to set the base timestamp to the delete horizon and adjust the deltas > > > > accordingly." My understanding is that during compaction, for each > > > > compacted new segment, we would set its base offset of each batch as > > the > > > > delete horizon, which is the "current system time that cleaner has > seen > > > so > > > > far", and adjust the delta timestamps of each of the inner records of > > the > > > > batch (and practically the deltas will be all negative)? > > > > > > > > If that's case, could we do some back of the envelope calculation on > > > what's > > > > the possible smallest case of deltas? Note that since we use varInt > for > > > > delta values for each record, the smaller the negative delta, that > > would > > > > take more bytes to encode. > > > > > > > > Guozhang > > > > > > > > On Wed, Oct 16, 2019 at 6:48 PM Jason Gustafson > > > > wrote: > > > > > > > > > +1. Thanks Richard. > > > > > > > > > > On Wed, Oct 16, 2019 at 10:04 AM Richard Yu < > > > yohan.richard...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Want to try to get this KIP wrapped up.