Hmm... I'm not sure how this can achieve "closing the store before restoration, re-open it with a slight different config, and then close-and-reopen store for query" pattern? You need to be able to access the store object in order to do this right?
Guozhang On Mon, Jun 26, 2017 at 7:40 AM, Bill Bejeck <bbej...@gmail.com> wrote: > Thinking about this some more, I have another approach. Leave the first > parameter of as String in the StateRestoreListener interface. > > But we'll provide 2 default abstract classes one implementing > StateRestoreCallback and the other implementing the > BatchingStateRestoreCallback. Both abstract classes will also implement > the StateRestoreListener interface with no-op methods provided for the > restore progress methods. > > WDYT? > > On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Guozhang, > > > > Thanks for the comments. > > > > I think that will work, but my concern is it might not be as clear to > > users that want to receive external notification of the restore progress > > separately (say for reporting purposes) and still send separate signals > to > > the state store for resource management tasks. > > > > However I like this approach better and I have some ideas I can do in the > > implementation, so I'll update the KIP accordingly. > > > > Thanks, > > Bill > > > > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > >> More specifically, if we can replace the first parameter from the String > >> store name to the store instance itself, would that be sufficient to > >> cover ` > >> StateRestoreNotification`? > >> > >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >> > Bill, > >> > > >> > I'm wondering why we need the `StateRestoreNotification` while still > >> > having `StateRestoreListener`, could the above setup achievable just > >> with > >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems > the > >> > later can subsume any use cases intended for the former API. > >> > > >> > Guozhang > >> > > >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> > wrote: > >> > > >> >> I'm going to update the KIP with new interface > StateRestoreNotification > >> >> containing two methods, startRestore and endRestore. > >> >> > >> >> While naming is very similar to methods already proposed on the > >> >> StateRestoreListener, the intent of these methods is not for user > >> >> notification of restore status. Instead these new methods are for > >> >> internal > >> >> use by the state store to perform any required setup and teardown > work > >> due > >> >> to a batch restoration process. > >> >> > >> >> Here's one current use case: when using RocksDB we should optimize > for > >> a > >> >> bulk load by setting Options.prepareForBulkload(). > >> >> > >> >> 1. If the database has already been opened, we'll need to close > it, > >> set > >> >> the "prepareForBulkload" and re-open the database. > >> >> 2. Once the restore is completed we'll need to close and re-open > the > >> >> database with the "prepareForBulkload" option turned off. > >> >> > >> >> While we are mentioning the RocksDB use case above, the addition of > >> this > >> >> interface is not specific to any specific implementation of a > >> persistent > >> >> state store. > >> >> > >> >> Additionally, a separate interface is needed so that any user can > >> >> implement > >> >> the state restore notification feature regardless of the state > restore > >> >> callback used. > >> >> > >> >> I'll also remove the "getStateRestoreListener" method and stick with > >> the > >> >> notion of a "global" restore listener for now. > >> >> > >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> > >> wrote: > >> >> > >> >> > Yes it is, more of an oversight on my part, I'll remove it from the > >> KIP. > >> >> > > >> >> > > >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax < > >> >> matth...@confluent.io> > >> >> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> > >> >> >> I thinks for now it's good enough to start with a single global > >> restore > >> >> >> listener. We can incrementally improve this later on if required. > Of > >> >> >> course, if it's easy to do right away we can also be more fine > >> grained. > >> >> >> But for KTable, we might want to add this after getting rid of all > >> the > >> >> >> overloads we have atm. > >> >> >> > >> >> >> One question: what is the purpose of parameter "endOffset" in > >> >> >> #onRestoreEnd() -- isn't this the same value as provided in > >> >> >> #onRestoreStart() ? > >> >> >> > >> >> >> > >> >> >> -Matthias > >> >> >> > >> >> >> > >> >> >> > >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote: > >> >> >> > Thinking about the custom StateRestoreListener approach and > >> having a > >> >> get > >> >> >> > method on the interface will really only work for custom state > >> >> stores. > >> >> >> > > >> >> >> > So we'll need to provide another way for users to set behavior > >> with > >> >> >> > provided state stores. The only option that comes to mind now > is > >> >> also > >> >> >> > adding a parameter to the StateStoreSupplier. > >> >> >> > > >> >> >> > > >> >> >> > Bill > >> >> >> > > >> >> >> > > >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com > > > >> >> wrote: > >> >> >> > > >> >> >> >> Guozhang, > >> >> >> >> > >> >> >> >> Thanks for the comments. > >> >> >> >> > >> >> >> >> 1. As for the granularity, I agree that having one global > >> >> >> >> StateRestoreListener could be restrictive. But I think it's > >> >> important > >> >> >> to > >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows > >> >> users > >> >> >> to > >> >> >> >> define an anonymous instance that has access to local scope for > >> >> >> reporting > >> >> >> >> purposes. This is a similar pattern we use for > >> >> >> >> KafkaStreams.setStateListener. > >> >> >> >> > >> >> >> >> As an alternative, what if we add a method to the > >> >> >> BatchingStateRestoreCallback > >> >> >> >> interface named "getStateStoreListener". Then in an abstract > >> >> adapter > >> >> >> >> class we return null from getStateStoreListener. But if users > >> >> want to > >> >> >> >> supply a different StateRestoreListener strategy per callback > >> they > >> >> >> would > >> >> >> >> simply override the method to return an actual instance. > >> >> >> >> > >> >> >> >> WDYT? > >> >> >> >> > >> >> >> >> 2. I'll make the required updates to pass in the ending offset > >> at > >> >> the > >> >> >> >> start as well as the actual name of the state store. > >> >> >> >> > >> >> >> >> Bill > >> >> >> >> > >> >> >> >> > >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang < > >> wangg...@gmail.com> > >> >> >> wrote: > >> >> >> >> > >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more > >> comments: > >> >> >> >>> > >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams > granularity > >> may > >> >> >> not be > >> >> >> >>> sufficient, as in the listener callback we do not which store > >> it is > >> >> >> >>> restoring right now: if the topic is a changelog topic then > from > >> >> the > >> >> >> >>> `TopicPartition` we may be able to infer the state store name, > >> but > >> >> if > >> >> >> the > >> >> >> >>> topic is the source topic read as a KTable then we may not > know > >> >> which > >> >> >> >>> store > >> >> >> >>> it is restoring right now; plus forcing users to infer the > state > >> >> store > >> >> >> >>> name > >> >> >> >>> from the topic partition name would not be intuitive as well. > >> Plus > >> >> for > >> >> >> >>> different stores the listener may be implemented differently, > >> and > >> >> >> setting > >> >> >> >>> a > >> >> >> >>> global listener would force users to branch on the > >> topic-partition > >> >> >> names, > >> >> >> >>> similarly to what we did in the global timestamp extractor. On > >> the > >> >> >> other > >> >> >> >>> hand, I also agree that setting the listener on the per-store > >> >> >> granularity > >> >> >> >>> may be a bit cumbersome since if users want to override it on > a > >> >> >> specific > >> >> >> >>> store it needs to expose some APIs maybe at > StateStoreSupplier. > >> So > >> >> >> would > >> >> >> >>> love to hear other people's opinions. > >> >> >> >>> > >> >> >> >>> If we think that different implemented restoring callback may > be > >> >> less > >> >> >> >>> common, then I'd suggest at least replace the `TopicPartition` > >> >> >> parameter > >> >> >> >>> with the `String` store name and the `TaskId`? > >> >> >> >>> > >> >> >> >>> 2. I think we can pass in the `long endOffset` in the > >> >> `onRestoreStart` > >> >> >> >>> function as well, as we will have read the endOffset already > by > >> >> then; > >> >> >> >>> otherwise users can still not be able to track the restoration > >> >> >> progress > >> >> >> >>> (e.g. how much percentage I have been restoring so far, to > >> estimate > >> >> >> on how > >> >> >> >>> long I still need to wait). > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> Guozhang > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck < > >> bbej...@gmail.com> > >> >> >> wrote: > >> >> >> >>> > >> >> >> >>>> Eno, > >> >> >> >>>> > >> >> >> >>>> Thanks for the comments. > >> >> >> >>>> > >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the > >> restore > >> >> >> method > >> >> >> >>> for > >> >> >> >>>> backward compatibility as that is what is used by current > >> >> >> implementing > >> >> >> >>>> classes. However as I think about it makes things cleaner to > >> have > >> >> a > >> >> >> >>> single > >> >> >> >>>> restore method taking a collection. I'll wait for others to > >> weigh > >> >> in, > >> >> >> >>> but > >> >> >> >>>> I'm leaning towards having a single restore method. > >> >> >> >>>> > >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the > >> >> restore > >> >> >> >>> process > >> >> >> >>>> as we load records from each poll request. > >> >> >> >>>> > >> >> >> >>>> For example if the change log contained 5000 records and > >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method > >> >> would > >> >> >> get > >> >> >> >>>> called 5 times each time with the ending offset of the last > >> >> record in > >> >> >> >>> the > >> >> >> >>>> batch and the count of the batch. I'll update the KIP to > >> add > >> >> >> >>> comments > >> >> >> >>>> above the interface methods. > >> >> >> >>>> > >> >> >> >>>> > >> >> >> >>>> Thanks, > >> >> >> >>>> Bill > >> >> >> >>>> > >> >> >> >>>> > >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska < > >> >> >> eno.there...@gmail.com> > >> >> >> >>>> wrote: > >> >> >> >>>> > >> >> >> >>>>> Thanks Bill, > >> >> >> >>>>> > >> >> >> >>>>> A couple of questions: > >> >> >> >>>>> > >> >> >> >>>>> > >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we > >> just > >> >> have > >> >> >> >>> one, > >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases > >> when > >> >> >> >>> people > >> >> >> >>>>> want to restore one at a time? In that case, they could > still > >> use > >> >> >> >>>>> restoreAll with just 1 record in the collection right? > >> >> >> >>>>> > >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a > small > >> >> >> comment > >> >> >> >>> on > >> >> >> >>>>> top of all three methods. An example might help here. > >> >> >> >>>>> > >> >> >> >>>>> Thanks > >> >> >> >>>>> Eno > >> >> >> >>>>> > >> >> >> >>>>> > >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> > >> wrote: > >> >> >> >>>>>> > >> >> >> >>>>>> Guozhang, Damian thanks for the comments. > >> >> >> >>>>>> > >> >> >> >>>>>> Giving developers the ability to hook into StateStore > >> recovery > >> >> >> >>> phases > >> >> >> >>>> was > >> >> >> >>>>>> part of my original intent. However the state the KIP is in > >> now > >> >> >> >>> won't > >> >> >> >>>>>> provide this functionality. > >> >> >> >>>>>> > >> >> >> >>>>>> As a result I'll be doing a significant revision of > KIP-167. > >> >> I'll > >> >> >> >>> be > >> >> >> >>>>> sure > >> >> >> >>>>>> to incorporate all your comments in the new revision. > >> >> >> >>>>>> > >> >> >> >>>>>> Thanks, > >> >> >> >>>>>> Bill > >> >> >> >>>>>> > >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy < > >> >> damian....@gmail.com> > >> >> >> >>>> wrote: > >> >> >> >>>>>> > >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested, > >> >> i.e., > >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and > >> also > >> >> need > >> >> >> >>> to > >> >> >> >>>>> have > >> >> >> >>>>>>> the end offset available such that people can use it > derive > >> >> >> >>> progress. > >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext > interface > >> >> could > >> >> >> >>> be: > >> >> >> >>>>>>> > >> >> >> >>>>>>> long beginOffset() > >> >> >> >>>>>>> long endOffset() > >> >> >> >>>>>>> long currentOffset() > >> >> >> >>>>>>> > >> >> >> >>>>>>> One further thing, this currently doesn't provide > developers > >> >> the > >> >> >> >>>>> ability to > >> >> >> >>>>>>> hook into this information if they are using the built-in > >> >> >> >>> StateStores. > >> >> >> >>>>> Is > >> >> >> >>>>>>> this something we should be considering? > >> >> >> >>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang < > >> wangg...@gmail.com> > >> >> >> >>> wrote: > >> >> >> >>>>>>> > >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of > >> comments: > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only > >> once > >> >> per > >> >> >> >>>> store > >> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll > is > >> >> >> called > >> >> >> >>>> per > >> >> >> >>>>>>>> batch. In that case I feel we can set the > >> StateRestoreContext > >> >> as > >> >> >> a > >> >> >> >>>>> second > >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and > let > >> the > >> >> >> >>>> library > >> >> >> >>>>> to > >> >> >> >>>>>>>> set the corresponding values instead and only let users > to > >> >> read > >> >> >> >>>> (since > >> >> >> >>>>>>> the > >> >> >> >>>>>>>> collection of key-value pairs do not contain offset > >> >> information > >> >> >> >>>> anyways > >> >> >> >>>>>>>> users cannot really set the offset). The > >> "lastOffsetRestored" > >> >> >> >>> would > >> >> >> >>>> be > >> >> >> >>>>>>> the > >> >> >> >>>>>>>> starting offset when called on `beginRestore`. > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> 2) Users who wants to implement their own batch > restoration > >> >> >> >>> callbacks > >> >> >> >>>>>>> would > >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` > while > >> >> they > >> >> >> >>>> either > >> >> >> >>>>>>> let > >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in > >> >> >> >>> `restoreAll` > >> >> >> >>>>>>> only > >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two > abstract > >> >> impl > >> >> >> >>> of > >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore / > >> >> >> >>> endRestore as > >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to > call > >> >> >> >>> abstract > >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw > "not > >> >> >> >>> supported > >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract. > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in > >> >> >> >>>>> StateRestoreContext, > >> >> >> >>>>>>>> which is important for users to track the restoration > >> progress > >> >> >> >>> since > >> >> >> >>>>>>>> otherwise they could not tell how many percent of > >> restoration > >> >> has > >> >> >> >>>>>>>> completed. I.e.: > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends > >> >> >> >>>>>>> StateRestoreCallback > >> >> >> >>>>>>>> { > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> void restoreAll(Collection<KeyValue<byte[], byte []>> > >> >> records, > >> >> >> >>>>>>>> StateRestoreContext > >> >> >> >>>>>>>> restoreContext); > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> void beginRestore(StateRestoreContext restoreContext); > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> void endRestore(StateRestoreContext restoreContext); > >> >> >> >>>>>>>> } > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> public interface StateRestoreContext { > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> long lastOffsetRestored(); > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> long endOffsetToRestore(); > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> int numberRestored(); > >> >> >> >>>>>>>> } > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> Guozhang > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck < > >> >> bbej...@gmail.com> > >> >> >> >>>> wrote: > >> >> >> >>>>>>>> > >> >> >> >>>>>>>>> Guozhang, Matthias, > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> Thanks for the comments. I have updated the KIP, (JIRA > >> title > >> >> >> and > >> >> >> >>>>>>>>> description as well). > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> I had thought about introducing a separate interface > >> >> altogether, > >> >> >> >>> but > >> >> >> >>>>>>>>> extending the current one makes more sense. > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of > >> >> >> >>> records, I > >> >> >> >>>>>>> think > >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of > >> querying > >> >> >> for > >> >> >> >>>>>>>>> intermediate results, but it would be per batch > restored. > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> Thanks, > >> >> >> >>>>>>>>> Bill > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski < > >> >> j...@jagunet.com> > >> >> >> >>>>> wrote: > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>>>> > >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax < > >> >> >> >>>>>>> matth...@confluent.io> > >> >> >> >>>>>>>>>> wrote: > >> >> >> >>>>>>>>>>> > >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not > >> change > >> >> >> the > >> >> >> >>>>>>>> current > >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the > >> current > >> >> >> >>> one. > >> >> >> >>>>>>>>>>> > >> >> >> >>>>>>>>>> > >> >> >> >>>>>>>>>> ++1 > >> >> >> >>>>>>>>>> > >> >> >> >>>>>>>>>> > >> >> >> >>>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> > >> >> >> >>>>>>>> -- > >> >> >> >>>>>>>> -- Guozhang > >> >> >> >>>>>>>> > >> >> >> >>>>>>> > >> >> >> >>>>> > >> >> >> >>>>> > >> >> >> >>>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> -- > >> >> >> >>> -- Guozhang > >> >> >> >>> > >> >> >> >> > >> >> >> >> > >> >> >> > > >> >> >> > >> >> >> > >> >> > > >> >> > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang