Good question about `StreamsRebalancingException` -- when this KIP was started, KIP-535 was not on the horizon yet.
What I am wondering is, if we should allow people to opt-in into querying during a rebalance, or to be more precise during a restore (if a state store is not migrated, it will be up-to-date during a rebalance and can be queried returning correct, ie, non-stall, data)? Otherwise, if people want to get only correct results, ie, they never want to query stall state, they have no way to implement it, because they are always subject to a race condition. For this case, we could have a `StateStoreIsRecoveringException` (or similar) that is only throw during a restore phases (and people can opt-in / opt-out if this exception should be throws or not, ie, if they want to query stall state during recovery or not). It's unclear to me though atm, how a user would opt-in/opt-out and what the default should be (maybe better to throw the exception by default to have strong consistency guarantees by default?) -Matthias On 1/9/20 11:35 AM, Vinoth Chandar wrote: > +1 on merging `StreamsNotRunningException` and > `StateStoreNotAvailableException`, both exceptions are fatal anyway. IMO its > best to have these exceptions be about the state store (and not streams > state), to easier understanding. > > Additionally, KIP-535 allows for querying of state stores in rebalancing > state. So do we need the StreamsRebalancingException? > > > On 2020/01/09 03:38:11, "Matthias J. Sax" <matth...@confluent.io> wrote: >> Sorry that I dropped the ball on this... >> >> Thanks for updating the KIP. Overall LGTM now. Feel free to start a VOTE >> thread. >> >> What is still unclear to me is, what we gain by having both >> `StreamsNotRunningException` and `StateStoreNotAvailableException`. Both >> exception are thrown when KafkaStreams is in state PENDING_SHUTDOWN / >> NOT_RUNNING / ERROR. Hence, as a user what do I gain to know if the >> state store is closed on not -- I can't query it anyway? Maybe I miss >> something thought? >> >> >> -Matthias >> >> >> On 11/3/19 6:07 PM, Vito Jeng wrote: >>> Sorry for the late reply, thanks for the review. >>> >>> >>>> About `StateStoreMigratedException`: >>>> >>>> Why is it only thrown if the state is REBALANCING? A store might be >>>> migrated during a rebalance, and Kafka Streams might resume back to >>>> RUNNING state and afterward somebody tries to use an old store handle. >>>> Also, if state is REBALANCING, should we throw >>>> `StreamThreadRebalancingException`? Hence, I think >>>> `StateStoreMigratedException` does only make sense during `RUNNING` state. >>>> >>> >>> Thank you point this, already updated. >>> >>> >>> Why do we need to distinguish between `KafkaStreamsNotRunningException` >>>> and `StateStoreNotAvailableException`? >>>> >>> >>> `KafkaStreamsNotRunningException` may be caused by various reasons, I think >>> it would be helpful that the >>> user can distinguish whether it is caused by the state store closed. >>> (Maybe I am wrong...) >>> >>> >>> Last, why do we distinguish between `KafkaStreams` instance and >>>> `StreamsThread`? To me, it seems we should always refer to the instance, >>>> because that is the level of granularity in which we enable/disable IQ atm. >>>> >>> >>> Totally agree. Do you mean the naming of state store exceptions? >>> I don't have special reason to distinguish these two. >>> Your suggestion look more reasonable for the exception naming. >>> >>> >>> Last, for `StateStoreMigratedException`, I would add that a user need to >>>> rediscover the store and cannot blindly retry as the store handle is >>>> invalid and a new store handle must be retrieved. That is a difference >>>> to `StreamThreadRebalancingException` that allows for "blind" retries >>>> that either resolve (if the store is still on the same instance after >>>> rebalancing finishes, or changes to `StateStoreMigratedException` if the >>>> store was migrated away during rebalancing). >>>> >>> >>> Nice, it's great! Thank you. >>> >>> >>> The KIP already updated, please take a look. :) >>> >>> >>> >>> On Wed, Oct 23, 2019 at 1:48 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Any update on this KIP? >>>> >>>> On 10/7/19 3:35 PM, Matthias J. Sax wrote: >>>>> Sorry for the late reply. The 2.4 deadline kept us quite busy. >>>>> >>>>> About `StateStoreMigratedException`: >>>>> >>>>> Why is it only thrown if the state is REBALANCING? A store might be >>>>> migrated during a rebalance, and Kafka Streams might resume back to >>>>> RUNNING state and afterward somebody tries to use an old store handle. >>>>> Also, if state is REBALANCING, should we throw >>>>> `StreamThreadRebalancingException`? Hence, I think >>>>> `StateStoreMigratedException` does only make sense during `RUNNING` >>>> state. >>>>> >>>>> >>>>> Why do we need to distinguish between `KafkaStreamsNotRunningException` >>>>> and `StateStoreNotAvailableException`? >>>>> >>>>> >>>>> Last, why do we distinguish between `KafkaStreams` instance and >>>>> `StreamsThread`? To me, it seems we should always refer to the instance, >>>>> because that is the level of granularity in which we enable/disable IQ >>>> atm. >>>>> >>>>> >>>>> Last, for `StateStoreMigratedException`, I would add that a user need to >>>>> rediscover the store and cannot blindly retry as the store handle is >>>>> invalid and a new store handle must be retrieved. That is a difference >>>>> to `StreamThreadRebalancingException` that allows for "blind" retries >>>>> that either resolve (if the store is still on the same instance after >>>>> rebalancing finishes, or changes to `StateStoreMigratedException` if the >>>>> store was migrated away during rebalancing). >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 8/9/19 10:20 AM, Vito Jeng wrote: >>>>>> My bad. The short link `https://shorturl.at/CDNT9` >>>> <https://shorturl.at/CDNT9> >>>>>> <https://shorturl.at/CDNT9> seems incorrect. >>>>>> >>>>>> Please use the following instead: https://shorturl.at/bkKQU >>>>>> >>>>>> >>>>>> --- >>>>>> Vito >>>>>> >>>>>> >>>>>> On Fri, Aug 9, 2019 at 10:53 AM Vito Jeng <v...@is-land.com.tw> wrote: >>>>>> >>>>>>> Thanks, Matthias! >>>>>>> >>>>>>>> About `StreamThreadNotStartedException`: >>>>>>> >>>>>>> Thank you for explanation. I agree with your opinion. >>>>>>> `CompositeReadOnlyXxxStore#get()` would never throw >>>>>>> `StreamThreadNotStartedException`. >>>>>>> >>>>>>> For the case that corresponding thread crashes after we handed out the >>>>>>> store handle. We may throw `KafkaStreamsNotRunningException` or >>>>>>> `StateStoreMigratedException`. >>>>>>> In `StreamThreadStateStoreProvider`, we would throw >>>>>>> `KafkaStreamsNotRunningException` when stream thread is not running( >>>>>>> https://shorturl.at/CDNT9) or throw `StateStoreMigratedException` when >>>>>>> store is closed(https://shorturl.at/hrvAN). So I think we do not need >>>> to >>>>>>> add a new type for this case. Does that make sense? >>>>>>> >>>>>>> >>>>>>>> About `KafkaStreamsNotRunningException` vs >>>>>>> `StreamThreadNotRunningException`: >>>>>>> >>>>>>> I understand your point. I rename `StreamThreadNotRunningException` to >>>>>>> `KafkaStreamsNotRunningException`. >>>>>>> >>>>>>> >>>>>>> About check unknown state store names: >>>>>>> Thank you for the hint. I add a new type `UnknownStateStoreException` >>>> for >>>>>>> this case. >>>>>>> >>>>>>> >>>>>>>> Also, we should still have fatal exception >>>>>>> `StateStoreNotAvailableException`? Not sure why you remove it? >>>>>>> >>>>>>> Thank you point this, already add it again. >>>>>>> >>>>>>> The KIP already updated, please take a look. >>>>>>> >>>>>>> --- >>>>>>> Vito >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >>
signature.asc
Description: OpenPGP digital signature