YW. :) On 1/1/18 4:49 AM, Javier Holguera wrote: > Hi Matthias, > > Thanks for clarifying the last point. I think I got a full picture of the > situation now. > > I agree that the most likely scenario is one where the third party fails to > serve all requests, so other instances of the app would be equally affected. > The scenario that I have in mind is mostly one where one instance is slowed > down while the others aren't. I imagine that it could happen if whatever > partition strategy we have chosen for the topic ends up triggering a slow > path on the third party. That instance would be repeatedly slow while the > others would hit the happy path. > > I agree that calling a third party from a Streams app is not ideal. However, > in this case we can't pre-load the information unfortunately. It's more like > a lookup that we have to do against the third party and we don't know in > advance the query criteria. > > Thanks for taking the time explaining the whole flow. > > -----Original Message----- > From: Matthias J. Sax [mailto:matth...@confluent.io] > Sent: Thursday, December 28, 2017 6:58 PM > To: users@kafka.apache.org > Subject: Re: Kafka Streams - max.poll.interval.ms defaults to > Integer.MAX_VALUE > >> I imagine that it means other consumers would not consume either until the >> rebalance is completed successfully. Is that correct? > > Yes. > >>> If it is, then we will assign explicit, narrower values to >>> max.poll.timeout.ms to avoid halting the stream in that scenario. > > You can do that. After the 500 seconds passed and the straggling instance > calls `poll()` again, it will trigger a new rebalance an rejoin the group. > > However, I am wondering about the overall scenario: if the external third > party service slows down or is offline, would this not affect all your > instances? So what do you gain? > > Also note, if you "unblock" the rebalance, the records that are processed by > the straggler, will be reassigned to the remaining instances, and thus, I > would assume that those get "stuck" too as they need to call the same > external service to process the data? > > In general, it is not recommended to call an external service within Kafka > Streams if possible. It would be better, to load the corresponding data into > a topic and read as a KTable to do a stream-table join. Not sure if this is > feasible for your use-case though. > > > -Matthias > > > > On 12/28/17 7:16 AM, Javier Holguera wrote: >> Hi Matthias, >> >> >> Thanks for your detailed answer. >> >> >> I have one more question (sorry to be so annoying!). If we had a topology >> making calls to a third party takes, let's say, 5 seconds to process >> requests and we pull a batch of 20 records, the app would be unresponsive in >> the event of a rebalance for up to 100 seconds (in the worst possible >> scenario). >> >> >> If the third party slows down or even goes down and we timeout after 25 >> seconds per call, the unresponsive period could grow to 500 seconds. I >> imagine that it means other consumers would not consume either until the >> rebalance is completed successfully. Is that correct? >> >> >> If it is, then we will assign explicit, narrower values to >> max.poll.timeout.ms to avoid halting the stream in that scenario. >> >> >> Thanks. >> >> >> ________________________________ >> From: Matthias J. Sax <matth...@confluent.io> >> Sent: 27 December 2017 19:54:38 >> To: users@kafka.apache.org >> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to >> Integer.MAX_VALUE >> >> It would -- however, this is not an issue for KafkaStreams as we make >> sure the thread is either still alive or properly shut down. Thus, if >> an error happens and the thread really dies, KafkaStreams ensures that >> the heartbeat thread is stopped and thus, a rebalance would not block >> forever as it drops out of the group via session-timeout. >> >> And as long as a KafkaStreams instance does restore, the rebalance >> should block by design. >> >> Note, that all this does not hold for newer versions of KafkaStreams >> anymore. >> >> >> -Matthias >> >> >> >> On 12/27/17 6:55 AM, Javier Holguera wrote: >>> Hi Matthias, >>> >>> Thanks for your answer. It makes a lot of sense. >>> >>> Just a follow-up question. KIP-62 says: "we give the client as much as >>> max.poll.interval.ms to handle a batch of records, this is also the maximum >>> time before a consumer can be expected to rejoin the group in the worst >>> case". Does it mean that a broker would wait Integer.MAX_VALUE for a client >>> to report in the event of a rebalance? That sounds improbable, so I must be >>> missing something. >>> >>> Thanks. >>> >>> >>> -----Original Message----- >>> From: Matthias J. Sax [mailto:matth...@confluent.io] >>> Sent: Friday, December 22, 2017 9:13 PM >>> To: users@kafka.apache.org >>> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to >>> Integer.MAX_VALUE >>> >>> The value was change to make Streams application robust against large state >>> restore phases during rebalance. >>> >>> Ie, it is targeted to exactly "fix" 2. If an application needs to restore >>> state, this state restore might take longer than the max.poll.interval.ms >>> parameter and thus, even if the application is in a good state it drops out >>> of the group. This results in rebalance "storms". The consumer default of >>> 30 seconds is too small for most applications and thus we set it to >>> MAX_VALUE -- if you have a good estimate on the max expected state restore >>> time, you can safely set the timeout to an appropriate value. >>> >>> Note, in Kafka 0.11 and 1.0 Kafka Streams state restore was largely >>> improved and it should not be an issue there to reduce the timeout >>> accordingly. >>> >>> >>> -Matthias >>> >>> On 12/20/17 7:14 AM, Javier Holguera wrote: >>>> Hi, >>>> >>>> According to the documentation, "max.poll.interval.ms" defaults to >>>> Integer.MAX_VALUE for Kafka Streams since 0.10.2.1. >>>> >>>> Considering that the "max.poll.interval.ms" is: >>>> >>>> 1. A "processing timeout" to control an upper limit for processing a >>>> batch of records AND >>>> 2. The rebalance timeout that the client will communicate to the >>>> broker, according to KIP-62 >>>> >>>> How do Kafka Streams application detect slow consumers that are taking too >>>> long to process a batch of messages? What replaces the existing mechanism >>>> with a smaller "max.poll.interval.ms" where the application will willingly >>>> abandon the consumer group when the timeout expires? >>>> >>>> From the broker perspective, what does it mean that the application >>>> communicates a "rebalance timeout" of Integer.MAX_VALUE? I can imagine it >>>> will not wait for that long in a rebalance. What happens then? >>>> >>>> Thanks. >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature