Good question about `StreamsRebalancingException` -- when this KIP was
started, KIP-535 was not on the horizon yet.

What I am wondering is, if we should allow people to opt-in into
querying during a rebalance, or to be more precise during a restore (if
a state store is not migrated, it will be up-to-date during a rebalance
and can be queried returning correct, ie, non-stall, data)?

Otherwise, if people want to get only correct results, ie, they never
want to query stall state, they have no way to implement it, because
they are always subject to a race condition.

For this case, we could have a `StateStoreIsRecoveringException` (or
similar) that is only throw during a restore phases (and people can
opt-in / opt-out if this exception should be throws or not, ie, if they
want to query stall state during recovery or not).

It's unclear to me though atm, how a user would opt-in/opt-out and what
the default should be (maybe better to throw the exception by default to
have strong consistency guarantees by default?)


-Matthias


On 1/9/20 11:35 AM, Vinoth Chandar wrote:
> +1 on merging `StreamsNotRunningException` and 
> `StateStoreNotAvailableException`, both exceptions are fatal anyway. IMO its 
> best to have these exceptions be about the state store (and not streams 
> state), to easier understanding. 
> 
> Additionally, KIP-535 allows for querying of state stores in rebalancing 
> state. So do we need the StreamsRebalancingException? 
> 
> 
> On 2020/01/09 03:38:11, "Matthias J. Sax" <matth...@confluent.io> wrote: 
>> Sorry that I dropped the ball on this...
>>
>> Thanks for updating the KIP. Overall LGTM now. Feel free to start a VOTE
>> thread.
>>
>> What is still unclear to me is, what we gain by having both
>> `StreamsNotRunningException` and `StateStoreNotAvailableException`. Both
>> exception are thrown when KafkaStreams is in state PENDING_SHUTDOWN /
>> NOT_RUNNING / ERROR. Hence, as a user what do I gain to know if the
>> state store is closed on not -- I can't query it anyway? Maybe I miss
>> something thought?
>>
>>
>> -Matthias
>>
>>
>> On 11/3/19 6:07 PM, Vito Jeng wrote:
>>> Sorry for the late reply, thanks for the review.
>>>
>>>
>>>> About `StateStoreMigratedException`:
>>>>
>>>> Why is it only thrown if the state is REBALANCING? A store might be
>>>> migrated during a rebalance, and Kafka Streams might resume back to
>>>> RUNNING state and afterward somebody tries to use an old store handle.
>>>> Also, if state is REBALANCING, should we throw
>>>> `StreamThreadRebalancingException`? Hence, I think
>>>> `StateStoreMigratedException` does only make sense during `RUNNING` state.
>>>>
>>>
>>> Thank you point this, already updated.
>>>
>>>
>>> Why do we need to distinguish between `KafkaStreamsNotRunningException`
>>>> and `StateStoreNotAvailableException`?
>>>>
>>>
>>> `KafkaStreamsNotRunningException` may be caused by various reasons, I think
>>> it would be helpful that the
>>> user can distinguish whether it is caused by the state store closed.
>>> (Maybe I am wrong...)
>>>
>>>
>>> Last, why do we distinguish between `KafkaStreams` instance and
>>>> `StreamsThread`? To me, it seems we should always refer to the instance,
>>>> because that is the level of granularity in which we enable/disable IQ atm.
>>>>
>>>
>>> Totally agree. Do you mean the naming of state store exceptions?
>>> I don't have special reason to distinguish these two.
>>> Your suggestion look more reasonable for the exception naming.
>>>
>>>
>>> Last, for `StateStoreMigratedException`, I would add that a user need to
>>>> rediscover the store and cannot blindly retry as the store handle is
>>>> invalid and a new store handle must be retrieved. That is a difference
>>>> to `StreamThreadRebalancingException` that allows for "blind" retries
>>>> that either resolve (if the store is still on the same instance after
>>>> rebalancing finishes, or changes to `StateStoreMigratedException` if the
>>>> store was migrated away during rebalancing).
>>>>
>>>
>>> Nice, it's great! Thank you.
>>>
>>>
>>> The KIP already updated, please take a look. :)
>>>
>>>
>>>
>>> On Wed, Oct 23, 2019 at 1:48 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Any update on this KIP?
>>>>
>>>> On 10/7/19 3:35 PM, Matthias J. Sax wrote:
>>>>> Sorry for the late reply. The 2.4 deadline kept us quite busy.
>>>>>
>>>>> About `StateStoreMigratedException`:
>>>>>
>>>>> Why is it only thrown if the state is REBALANCING? A store might be
>>>>> migrated during a rebalance, and Kafka Streams might resume back to
>>>>> RUNNING state and afterward somebody tries to use an old store handle.
>>>>> Also, if state is REBALANCING, should we throw
>>>>> `StreamThreadRebalancingException`? Hence, I think
>>>>> `StateStoreMigratedException` does only make sense during `RUNNING`
>>>> state.
>>>>>
>>>>>
>>>>> Why do we need to distinguish between `KafkaStreamsNotRunningException`
>>>>> and `StateStoreNotAvailableException`?
>>>>>
>>>>>
>>>>> Last, why do we distinguish between `KafkaStreams` instance and
>>>>> `StreamsThread`? To me, it seems we should always refer to the instance,
>>>>> because that is the level of granularity in which we enable/disable IQ
>>>> atm.
>>>>>
>>>>>
>>>>> Last, for `StateStoreMigratedException`, I would add that a user need to
>>>>> rediscover the store and cannot blindly retry as the store handle is
>>>>> invalid and a new store handle must be retrieved. That is a difference
>>>>> to `StreamThreadRebalancingException` that allows for "blind" retries
>>>>> that either resolve (if the store is still on the same instance after
>>>>> rebalancing finishes, or changes to `StateStoreMigratedException` if the
>>>>> store was migrated away during rebalancing).
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 8/9/19 10:20 AM, Vito Jeng wrote:
>>>>>> My bad. The short link `https://shorturl.at/CDNT9`
>>>> <https://shorturl.at/CDNT9>
>>>>>> <https://shorturl.at/CDNT9> seems incorrect.
>>>>>>
>>>>>> Please use the following instead: https://shorturl.at/bkKQU
>>>>>>
>>>>>>
>>>>>> ---
>>>>>> Vito
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 9, 2019 at 10:53 AM Vito Jeng <v...@is-land.com.tw> wrote:
>>>>>>
>>>>>>> Thanks, Matthias!
>>>>>>>
>>>>>>>> About `StreamThreadNotStartedException`:
>>>>>>>
>>>>>>> Thank you for explanation. I agree with your opinion.
>>>>>>> `CompositeReadOnlyXxxStore#get()` would never throw
>>>>>>> `StreamThreadNotStartedException`.
>>>>>>>
>>>>>>> For the case that corresponding thread crashes after we handed out the
>>>>>>> store handle. We may throw `KafkaStreamsNotRunningException` or
>>>>>>> `StateStoreMigratedException`.
>>>>>>> In `StreamThreadStateStoreProvider`, we would throw
>>>>>>> `KafkaStreamsNotRunningException` when stream thread is not running(
>>>>>>> https://shorturl.at/CDNT9) or throw `StateStoreMigratedException` when
>>>>>>> store is closed(https://shorturl.at/hrvAN). So I think we do not need
>>>> to
>>>>>>> add a new type for this case. Does that make sense?
>>>>>>>
>>>>>>>
>>>>>>>> About `KafkaStreamsNotRunningException` vs
>>>>>>> `StreamThreadNotRunningException`:
>>>>>>>
>>>>>>> I understand your point. I rename `StreamThreadNotRunningException` to
>>>>>>> `KafkaStreamsNotRunningException`.
>>>>>>>
>>>>>>>
>>>>>>> About check unknown state store names:
>>>>>>> Thank you for the hint. I add a new type `UnknownStateStoreException`
>>>> for
>>>>>>> this case.
>>>>>>>
>>>>>>>
>>>>>>>> Also, we should still have fatal exception
>>>>>>> `StateStoreNotAvailableException`? Not sure why you remove it?
>>>>>>>
>>>>>>> Thank you point this, already add it again.
>>>>>>>
>>>>>>> The KIP already updated, please take a look.
>>>>>>>
>>>>>>> ---
>>>>>>> Vito
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to