Bill, I'm wondering why we need the `StateRestoreNotification` while still having `StateRestoreListener`, could the above setup achievable just with `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the later can subsume any use cases intended for the former API.
Guozhang On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> wrote: > I'm going to update the KIP with new interface StateRestoreNotification > containing two methods, startRestore and endRestore. > > While naming is very similar to methods already proposed on the > StateRestoreListener, the intent of these methods is not for user > notification of restore status. Instead these new methods are for internal > use by the state store to perform any required setup and teardown work due > to a batch restoration process. > > Here's one current use case: when using RocksDB we should optimize for a > bulk load by setting Options.prepareForBulkload(). > > 1. If the database has already been opened, we'll need to close it, set > the "prepareForBulkload" and re-open the database. > 2. Once the restore is completed we'll need to close and re-open the > database with the "prepareForBulkload" option turned off. > > While we are mentioning the RocksDB use case above, the addition of this > interface is not specific to any specific implementation of a persistent > state store. > > Additionally, a separate interface is needed so that any user can implement > the state restore notification feature regardless of the state restore > callback used. > > I'll also remove the "getStateRestoreListener" method and stick with the > notion of a "global" restore listener for now. > > On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Yes it is, more of an oversight on my part, I'll remove it from the KIP. > > > > > > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > >> Hi, > >> > >> I thinks for now it's good enough to start with a single global restore > >> listener. We can incrementally improve this later on if required. Of > >> course, if it's easy to do right away we can also be more fine grained. > >> But for KTable, we might want to add this after getting rid of all the > >> overloads we have atm. > >> > >> One question: what is the purpose of parameter "endOffset" in > >> #onRestoreEnd() -- isn't this the same value as provided in > >> #onRestoreStart() ? > >> > >> > >> -Matthias > >> > >> > >> > >> On 6/15/17 6:18 AM, Bill Bejeck wrote: > >> > Thinking about the custom StateRestoreListener approach and having a > get > >> > method on the interface will really only work for custom state stores. > >> > > >> > So we'll need to provide another way for users to set behavior with > >> > provided state stores. The only option that comes to mind now is also > >> > adding a parameter to the StateStoreSupplier. > >> > > >> > > >> > Bill > >> > > >> > > >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com> > wrote: > >> > > >> >> Guozhang, > >> >> > >> >> Thanks for the comments. > >> >> > >> >> 1. As for the granularity, I agree that having one global > >> >> StateRestoreListener could be restrictive. But I think it's > important > >> to > >> >> have a "setStateRestoreListener" on KafkaStreams as this allows users > >> to > >> >> define an anonymous instance that has access to local scope for > >> reporting > >> >> purposes. This is a similar pattern we use for > >> >> KafkaStreams.setStateListener. > >> >> > >> >> As an alternative, what if we add a method to the > >> BatchingStateRestoreCallback > >> >> interface named "getStateStoreListener". Then in an abstract > adapter > >> >> class we return null from getStateStoreListener. But if users want > to > >> >> supply a different StateRestoreListener strategy per callback they > >> would > >> >> simply override the method to return an actual instance. > >> >> > >> >> WDYT? > >> >> > >> >> 2. I'll make the required updates to pass in the ending offset at > the > >> >> start as well as the actual name of the state store. > >> >> > >> >> Bill > >> >> > >> >> > >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> >> > >> >>> Thanks Bill for the updated wiki. I have a couple of more comments: > >> >>> > >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may > >> not be > >> >>> sufficient, as in the listener callback we do not which store it is > >> >>> restoring right now: if the topic is a changelog topic then from the > >> >>> `TopicPartition` we may be able to infer the state store name, but > if > >> the > >> >>> topic is the source topic read as a KTable then we may not know > which > >> >>> store > >> >>> it is restoring right now; plus forcing users to infer the state > store > >> >>> name > >> >>> from the topic partition name would not be intuitive as well. Plus > for > >> >>> different stores the listener may be implemented differently, and > >> setting > >> >>> a > >> >>> global listener would force users to branch on the topic-partition > >> names, > >> >>> similarly to what we did in the global timestamp extractor. On the > >> other > >> >>> hand, I also agree that setting the listener on the per-store > >> granularity > >> >>> may be a bit cumbersome since if users want to override it on a > >> specific > >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So > >> would > >> >>> love to hear other people's opinions. > >> >>> > >> >>> If we think that different implemented restoring callback may be > less > >> >>> common, then I'd suggest at least replace the `TopicPartition` > >> parameter > >> >>> with the `String` store name and the `TaskId`? > >> >>> > >> >>> 2. I think we can pass in the `long endOffset` in the > `onRestoreStart` > >> >>> function as well, as we will have read the endOffset already by > then; > >> >>> otherwise users can still not be able to track the restoration > >> progress > >> >>> (e.g. how much percentage I have been restoring so far, to estimate > >> on how > >> >>> long I still need to wait). > >> >>> > >> >>> > >> >>> Guozhang > >> >>> > >> >>> > >> >>> > >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbej...@gmail.com> > >> wrote: > >> >>> > >> >>>> Eno, > >> >>>> > >> >>>> Thanks for the comments. > >> >>>> > >> >>>> 1. As for having both restore and restoreAll, I kept the restore > >> method > >> >>> for > >> >>>> backward compatibility as that is what is used by current > >> implementing > >> >>>> classes. However as I think about it makes things cleaner to have a > >> >>> single > >> >>>> restore method taking a collection. I'll wait for others to weigh > in, > >> >>> but > >> >>>> I'm leaning towards having a single restore method. > >> >>>> > >> >>>> 2. The "onBatchRestored" method is for keeping track of the restore > >> >>> process > >> >>>> as we load records from each poll request. > >> >>>> > >> >>>> For example if the change log contained 5000 records and > >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would > >> get > >> >>>> called 5 times each time with the ending offset of the last record > in > >> >>> the > >> >>>> batch and the count of the batch. I'll update the KIP to add > >> >>> comments > >> >>>> above the interface methods. > >> >>>> > >> >>>> > >> >>>> Thanks, > >> >>>> Bill > >> >>>> > >> >>>> > >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska < > >> eno.there...@gmail.com> > >> >>>> wrote: > >> >>>> > >> >>>>> Thanks Bill, > >> >>>>> > >> >>>>> A couple of questions: > >> >>>>> > >> >>>>> > >> >>>> 1. why do we need both restore and restoreAll, why can't we just > have > >> >>> one, > >> >>>>> that takes a collection (i.e., restore all)? Are there cases when > >> >>> people > >> >>>>> want to restore one at a time? In that case, they could still use > >> >>>>> restoreAll with just 1 record in the collection right? > >> >>>>> > >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small > >> comment > >> >>> on > >> >>>>> top of all three methods. An example might help here. > >> >>>>> > >> >>>>> Thanks > >> >>>>> Eno > >> >>>>> > >> >>>>> > >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> wrote: > >> >>>>>> > >> >>>>>> Guozhang, Damian thanks for the comments. > >> >>>>>> > >> >>>>>> Giving developers the ability to hook into StateStore recovery > >> >>> phases > >> >>>> was > >> >>>>>> part of my original intent. However the state the KIP is in now > >> >>> won't > >> >>>>>> provide this functionality. > >> >>>>>> > >> >>>>>> As a result I'll be doing a significant revision of KIP-167. > I'll > >> >>> be > >> >>>>> sure > >> >>>>>> to incorporate all your comments in the new revision. > >> >>>>>> > >> >>>>>> Thanks, > >> >>>>>> Bill > >> >>>>>> > >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <damian....@gmail.com > > > >> >>>> wrote: > >> >>>>>> > >> >>>>>>> I'm largely in agreement with what Guozhang has suggested, i.e., > >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also > need > >> >>> to > >> >>>>> have > >> >>>>>>> the end offset available such that people can use it derive > >> >>> progress. > >> >>>>>>> Slightly different, maybe the StateRestoreContext interface > could > >> >>> be: > >> >>>>>>> > >> >>>>>>> long beginOffset() > >> >>>>>>> long endOffset() > >> >>>>>>> long currentOffset() > >> >>>>>>> > >> >>>>>>> One further thing, this currently doesn't provide developers the > >> >>>>> ability to > >> >>>>>>> hook into this information if they are using the built-in > >> >>> StateStores. > >> >>>>> Is > >> >>>>>>> this something we should be considering? > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wangg...@gmail.com> > >> >>> wrote: > >> >>>>>>> > >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments: > >> >>>>>>>> > >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once > per > >> >>>> store > >> >>>>>>>> throughout the whole restoration process, and restoreAll is > >> called > >> >>>> per > >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext > as > >> a > >> >>>>> second > >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the > >> >>>> library > >> >>>>> to > >> >>>>>>>> set the corresponding values instead and only let users to read > >> >>>> (since > >> >>>>>>> the > >> >>>>>>>> collection of key-value pairs do not contain offset information > >> >>>> anyways > >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored" > >> >>> would > >> >>>> be > >> >>>>>>> the > >> >>>>>>>> starting offset when called on `beginRestore`. > >> >>>>>>>> > >> >>>>>>>> 2) Users who wants to implement their own batch restoration > >> >>> callbacks > >> >>>>>>> would > >> >>>>>>>> now need to implement both `restore` and `restoreAll` while > they > >> >>>> either > >> >>>>>>> let > >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in > >> >>> `restoreAll` > >> >>>>>>> only > >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract > impl > >> >>> of > >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore / > >> >>> endRestore as > >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call > >> >>> abstract > >> >>>>>>>> `restore` while the other implement `restore` to throw "not > >> >>> supported > >> >>>>>>>> exception" and keep `restoreAll` abstract. > >> >>>>>>>> > >> >>>>>>>> 3) I think we can also return the "offset limit" in > >> >>>>> StateRestoreContext, > >> >>>>>>>> which is important for users to track the restoration progress > >> >>> since > >> >>>>>>>> otherwise they could not tell how many percent of restoration > has > >> >>>>>>>> completed. I.e.: > >> >>>>>>>> > >> >>>>>>>> public interface BatchingStateRestoreCallback extends > >> >>>>>>> StateRestoreCallback > >> >>>>>>>> { > >> >>>>>>>> > >> >>>>>>>> void restoreAll(Collection<KeyValue<byte[], byte []>> > records, > >> >>>>>>>> StateRestoreContext > >> >>>>>>>> restoreContext); > >> >>>>>>>> > >> >>>>>>>> void beginRestore(StateRestoreContext restoreContext); > >> >>>>>>>> > >> >>>>>>>> void endRestore(StateRestoreContext restoreContext); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> public interface StateRestoreContext { > >> >>>>>>>> > >> >>>>>>>> long lastOffsetRestored(); > >> >>>>>>>> > >> >>>>>>>> long endOffsetToRestore(); > >> >>>>>>>> > >> >>>>>>>> int numberRestored(); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> Guozhang > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bbej...@gmail.com > > > >> >>>> wrote: > >> >>>>>>>> > >> >>>>>>>>> Guozhang, Matthias, > >> >>>>>>>>> > >> >>>>>>>>> Thanks for the comments. I have updated the KIP, (JIRA title > >> and > >> >>>>>>>>> description as well). > >> >>>>>>>>> > >> >>>>>>>>> I had thought about introducing a separate interface > altogether, > >> >>> but > >> >>>>>>>>> extending the current one makes more sense. > >> >>>>>>>>> > >> >>>>>>>>> As for intermediate callbacks based on time or number of > >> >>> records, I > >> >>>>>>> think > >> >>>>>>>>> the latest update to the KIP addresses this point of querying > >> for > >> >>>>>>>>> intermediate results, but it would be per batch restored. > >> >>>>>>>>> > >> >>>>>>>>> Thanks, > >> >>>>>>>>> Bill > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski < > j...@jagunet.com> > >> >>>>> wrote: > >> >>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax < > >> >>>>>>> matth...@confluent.io> > >> >>>>>>>>>> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>> With regard to backward compatibility, we should not change > >> the > >> >>>>>>>> current > >> >>>>>>>>>>> interface, but add a new interface that extends the current > >> >>> one. > >> >>>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>>> ++1 > >> >>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> -- > >> >>>>>>>> -- Guozhang > >> >>>>>>>> > >> >>>>>>> > >> >>>>> > >> >>>>> > >> >>>> > >> >>> > >> >>> > >> >>> > >> >>> -- > >> >>> -- Guozhang > >> >>> > >> >> > >> >> > >> > > >> > >> > > > -- -- Guozhang