YW. :)

On 1/1/18 4:49 AM, Javier Holguera wrote:
> Hi Matthias,
> 
> Thanks for clarifying the last point. I think I got a full picture of the 
> situation now.
> 
> I agree that the most likely scenario is one where the third party fails to 
> serve all requests, so other instances of the app would be equally affected. 
> The scenario that I have in mind is mostly one where one instance is slowed 
> down while the others aren't. I imagine that it could happen if whatever 
> partition strategy we have chosen for the topic ends up triggering a slow 
> path on the third party. That instance would be repeatedly slow while the 
> others would hit the happy path.
> 
> I agree that calling a third party from a Streams app is not ideal. However, 
> in this case we can't pre-load the information unfortunately. It's more like 
> a lookup that we have to do against the third party and we don't know in 
> advance the query criteria.
> 
> Thanks for taking the time explaining the whole flow.
> 
> -----Original Message-----
> From: Matthias J. Sax [mailto:matth...@confluent.io] 
> Sent: Thursday, December 28, 2017 6:58 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
> Integer.MAX_VALUE
> 
>> I imagine that it means other consumers would not consume either until the 
>> rebalance is completed successfully. Is that correct?
> 
> Yes.
> 
>>> If it is, then we will assign explicit, narrower values to 
>>> max.poll.timeout.ms to avoid halting the stream in that scenario.
> 
> You can do that. After the 500 seconds passed and the straggling instance 
> calls `poll()` again, it will trigger a new rebalance an rejoin the group.
> 
> However, I am wondering about the overall scenario: if the external third 
> party service slows down or is offline, would this not affect all your 
> instances? So what do you gain?
> 
> Also note, if you "unblock" the rebalance, the records that are processed by 
> the straggler, will be reassigned to the remaining instances, and thus, I 
> would assume that those get "stuck" too as they need to call the same 
> external service to process the data?
> 
> In general, it is not recommended to call an external service within Kafka 
> Streams if possible. It would be better, to load the corresponding data into 
> a topic and read as a KTable to do a stream-table join. Not sure if this is 
> feasible for your use-case though.
> 
> 
> -Matthias
> 
> 
> 
> On 12/28/17 7:16 AM, Javier Holguera wrote:
>> Hi Matthias,
>>
>>
>> Thanks for your detailed answer.
>>
>>
>> I have one more question (sorry to be so annoying!). If we had a topology 
>> making calls to a third party takes, let's say, 5 seconds to process 
>> requests and we pull a batch of 20 records, the app would be unresponsive in 
>> the event of a rebalance for up to 100 seconds (in the worst possible 
>> scenario).
>>
>>
>> If the third party slows down or even goes down and we timeout after 25 
>> seconds per call, the unresponsive period could grow to 500 seconds. I 
>> imagine that it means other consumers would not consume either until the 
>> rebalance is completed successfully. Is that correct?
>>
>>
>> If it is, then we will assign explicit, narrower values to 
>> max.poll.timeout.ms to avoid halting the stream in that scenario.
>>
>>
>> Thanks.
>>
>>
>> ________________________________
>> From: Matthias J. Sax <matth...@confluent.io>
>> Sent: 27 December 2017 19:54:38
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
>> Integer.MAX_VALUE
>>
>> It would -- however, this is not an issue for KafkaStreams as we make 
>> sure the thread is either still alive or properly shut down. Thus, if 
>> an error happens and the thread really dies, KafkaStreams ensures that 
>> the heartbeat thread is stopped and thus, a rebalance would not block 
>> forever as it drops out of the group via session-timeout.
>>
>> And as long as a KafkaStreams instance does restore, the rebalance 
>> should block by design.
>>
>> Note, that all this does not hold for newer versions of KafkaStreams 
>> anymore.
>>
>>
>> -Matthias
>>
>>
>>
>> On 12/27/17 6:55 AM, Javier Holguera wrote:
>>> Hi Matthias,
>>>
>>> Thanks for your answer. It makes a lot of sense.
>>>
>>> Just a follow-up question. KIP-62 says: "we give the client as much as 
>>> max.poll.interval.ms to handle a batch of records, this is also the maximum 
>>> time before a consumer can be expected to rejoin the group in the worst 
>>> case". Does it mean that a broker would wait Integer.MAX_VALUE for a client 
>>> to report in the event of a rebalance? That sounds improbable, so I must be 
>>> missing something.
>>>
>>> Thanks.
>>>
>>>
>>> -----Original Message-----
>>> From: Matthias J. Sax [mailto:matth...@confluent.io]
>>> Sent: Friday, December 22, 2017 9:13 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
>>> Integer.MAX_VALUE
>>>
>>> The value was change to make Streams application robust against large state 
>>> restore phases during rebalance.
>>>
>>> Ie, it is targeted to exactly "fix" 2. If an application needs to restore 
>>> state, this state restore might take longer than the max.poll.interval.ms 
>>> parameter and thus, even if the application is in a good state it drops out 
>>> of the group. This results in rebalance "storms". The consumer default of 
>>> 30 seconds is too small for most applications and thus we set it to 
>>> MAX_VALUE -- if you have a good estimate on the max expected state restore 
>>> time, you can safely set the timeout to an appropriate value.
>>>
>>> Note, in Kafka 0.11 and 1.0 Kafka Streams state restore was largely 
>>> improved and it should not be an issue there to reduce the timeout 
>>> accordingly.
>>>
>>>
>>> -Matthias
>>>
>>> On 12/20/17 7:14 AM, Javier Holguera wrote:
>>>> Hi,
>>>>
>>>> According to the documentation, "max.poll.interval.ms" defaults to 
>>>> Integer.MAX_VALUE for Kafka Streams since 0.10.2.1.
>>>>
>>>> Considering that the "max.poll.interval.ms" is:
>>>>
>>>>   1.  A "processing timeout" to control an upper limit for processing a 
>>>> batch of records AND
>>>>   2.  The rebalance timeout that the client will communicate to the 
>>>> broker, according to KIP-62
>>>>
>>>> How do Kafka Streams application detect slow consumers that are taking too 
>>>> long to process a batch of messages? What replaces the existing mechanism 
>>>> with a smaller "max.poll.interval.ms" where the application will willingly 
>>>> abandon the consumer group when the timeout expires?
>>>>
>>>> From the broker perspective, what does it mean that the application 
>>>> communicates a "rebalance timeout" of Integer.MAX_VALUE? I can imagine it 
>>>> will not wait for that long in a rebalance. What happens then?
>>>>
>>>> Thanks.
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to