All, I've updated the KIP and I'd like to start a final round of discussion with an eye towards starting a vote soon, maybe on Monday.
Thanks, Bill On Mon, Jun 26, 2017 at 7:26 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hmm... I'm not sure how this can achieve "closing the store before > restoration, re-open it with a slight different config, and then > close-and-reopen store for query" pattern? You need to be able to access > the store object in order to do this right? > > > Guozhang > > On Mon, Jun 26, 2017 at 7:40 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > Thinking about this some more, I have another approach. Leave the first > > parameter of as String in the StateRestoreListener interface. > > > > But we'll provide 2 default abstract classes one implementing > > StateRestoreCallback and the other implementing the > > BatchingStateRestoreCallback. Both abstract classes will also implement > > the StateRestoreListener interface with no-op methods provided for the > > restore progress methods. > > > > WDYT? > > > > On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > > > Guozhang, > > > > > > Thanks for the comments. > > > > > > I think that will work, but my concern is it might not be as clear to > > > users that want to receive external notification of the restore > progress > > > separately (say for reporting purposes) and still send separate signals > > to > > > the state store for resource management tasks. > > > > > > However I like this approach better and I have some ideas I can do in > the > > > implementation, so I'll update the KIP accordingly. > > > > > > Thanks, > > > Bill > > > > > > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > >> More specifically, if we can replace the first parameter from the > String > > >> store name to the store instance itself, would that be sufficient to > > >> cover ` > > >> StateRestoreNotification`? > > >> > > >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >> > Bill, > > >> > > > >> > I'm wondering why we need the `StateRestoreNotification` while still > > >> > having `StateRestoreListener`, could the above setup achievable just > > >> with > > >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems > > the > > >> > later can subsume any use cases intended for the former API. > > >> > > > >> > Guozhang > > >> > > > >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> > > wrote: > > >> > > > >> >> I'm going to update the KIP with new interface > > StateRestoreNotification > > >> >> containing two methods, startRestore and endRestore. > > >> >> > > >> >> While naming is very similar to methods already proposed on the > > >> >> StateRestoreListener, the intent of these methods is not for user > > >> >> notification of restore status. Instead these new methods are for > > >> >> internal > > >> >> use by the state store to perform any required setup and teardown > > work > > >> due > > >> >> to a batch restoration process. > > >> >> > > >> >> Here's one current use case: when using RocksDB we should optimize > > for > > >> a > > >> >> bulk load by setting Options.prepareForBulkload(). > > >> >> > > >> >> 1. If the database has already been opened, we'll need to close > > it, > > >> set > > >> >> the "prepareForBulkload" and re-open the database. > > >> >> 2. Once the restore is completed we'll need to close and re-open > > the > > >> >> database with the "prepareForBulkload" option turned off. > > >> >> > > >> >> While we are mentioning the RocksDB use case above, the addition of > > >> this > > >> >> interface is not specific to any specific implementation of a > > >> persistent > > >> >> state store. > > >> >> > > >> >> Additionally, a separate interface is needed so that any user can > > >> >> implement > > >> >> the state restore notification feature regardless of the state > > restore > > >> >> callback used. > > >> >> > > >> >> I'll also remove the "getStateRestoreListener" method and stick > with > > >> the > > >> >> notion of a "global" restore listener for now. > > >> >> > > >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> > > >> wrote: > > >> >> > > >> >> > Yes it is, more of an oversight on my part, I'll remove it from > the > > >> KIP. > > >> >> > > > >> >> > > > >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax < > > >> >> matth...@confluent.io> > > >> >> > wrote: > > >> >> > > > >> >> >> Hi, > > >> >> >> > > >> >> >> I thinks for now it's good enough to start with a single global > > >> restore > > >> >> >> listener. We can incrementally improve this later on if > required. > > Of > > >> >> >> course, if it's easy to do right away we can also be more fine > > >> grained. > > >> >> >> But for KTable, we might want to add this after getting rid of > all > > >> the > > >> >> >> overloads we have atm. > > >> >> >> > > >> >> >> One question: what is the purpose of parameter "endOffset" in > > >> >> >> #onRestoreEnd() -- isn't this the same value as provided in > > >> >> >> #onRestoreStart() ? > > >> >> >> > > >> >> >> > > >> >> >> -Matthias > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote: > > >> >> >> > Thinking about the custom StateRestoreListener approach and > > >> having a > > >> >> get > > >> >> >> > method on the interface will really only work for custom state > > >> >> stores. > > >> >> >> > > > >> >> >> > So we'll need to provide another way for users to set behavior > > >> with > > >> >> >> > provided state stores. The only option that comes to mind now > > is > > >> >> also > > >> >> >> > adding a parameter to the StateStoreSupplier. > > >> >> >> > > > >> >> >> > > > >> >> >> > Bill > > >> >> >> > > > >> >> >> > > > >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck < > bbej...@gmail.com > > > > > >> >> wrote: > > >> >> >> > > > >> >> >> >> Guozhang, > > >> >> >> >> > > >> >> >> >> Thanks for the comments. > > >> >> >> >> > > >> >> >> >> 1. As for the granularity, I agree that having one global > > >> >> >> >> StateRestoreListener could be restrictive. But I think it's > > >> >> important > > >> >> >> to > > >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this > allows > > >> >> users > > >> >> >> to > > >> >> >> >> define an anonymous instance that has access to local scope > for > > >> >> >> reporting > > >> >> >> >> purposes. This is a similar pattern we use for > > >> >> >> >> KafkaStreams.setStateListener. > > >> >> >> >> > > >> >> >> >> As an alternative, what if we add a method to the > > >> >> >> BatchingStateRestoreCallback > > >> >> >> >> interface named "getStateStoreListener". Then in an > abstract > > >> >> adapter > > >> >> >> >> class we return null from getStateStoreListener. But if > users > > >> >> want to > > >> >> >> >> supply a different StateRestoreListener strategy per callback > > >> they > > >> >> >> would > > >> >> >> >> simply override the method to return an actual instance. > > >> >> >> >> > > >> >> >> >> WDYT? > > >> >> >> >> > > >> >> >> >> 2. I'll make the required updates to pass in the ending > offset > > >> at > > >> >> the > > >> >> >> >> start as well as the actual name of the state store. > > >> >> >> >> > > >> >> >> >> Bill > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang < > > >> wangg...@gmail.com> > > >> >> >> wrote: > > >> >> >> >> > > >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more > > >> comments: > > >> >> >> >>> > > >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams > > granularity > > >> may > > >> >> >> not be > > >> >> >> >>> sufficient, as in the listener callback we do not which > store > > >> it is > > >> >> >> >>> restoring right now: if the topic is a changelog topic then > > from > > >> >> the > > >> >> >> >>> `TopicPartition` we may be able to infer the state store > name, > > >> but > > >> >> if > > >> >> >> the > > >> >> >> >>> topic is the source topic read as a KTable then we may not > > know > > >> >> which > > >> >> >> >>> store > > >> >> >> >>> it is restoring right now; plus forcing users to infer the > > state > > >> >> store > > >> >> >> >>> name > > >> >> >> >>> from the topic partition name would not be intuitive as > well. > > >> Plus > > >> >> for > > >> >> >> >>> different stores the listener may be implemented > differently, > > >> and > > >> >> >> setting > > >> >> >> >>> a > > >> >> >> >>> global listener would force users to branch on the > > >> topic-partition > > >> >> >> names, > > >> >> >> >>> similarly to what we did in the global timestamp extractor. > On > > >> the > > >> >> >> other > > >> >> >> >>> hand, I also agree that setting the listener on the > per-store > > >> >> >> granularity > > >> >> >> >>> may be a bit cumbersome since if users want to override it > on > > a > > >> >> >> specific > > >> >> >> >>> store it needs to expose some APIs maybe at > > StateStoreSupplier. > > >> So > > >> >> >> would > > >> >> >> >>> love to hear other people's opinions. > > >> >> >> >>> > > >> >> >> >>> If we think that different implemented restoring callback > may > > be > > >> >> less > > >> >> >> >>> common, then I'd suggest at least replace the > `TopicPartition` > > >> >> >> parameter > > >> >> >> >>> with the `String` store name and the `TaskId`? > > >> >> >> >>> > > >> >> >> >>> 2. I think we can pass in the `long endOffset` in the > > >> >> `onRestoreStart` > > >> >> >> >>> function as well, as we will have read the endOffset already > > by > > >> >> then; > > >> >> >> >>> otherwise users can still not be able to track the > restoration > > >> >> >> progress > > >> >> >> >>> (e.g. how much percentage I have been restoring so far, to > > >> estimate > > >> >> >> on how > > >> >> >> >>> long I still need to wait). > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> Guozhang > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck < > > >> bbej...@gmail.com> > > >> >> >> wrote: > > >> >> >> >>> > > >> >> >> >>>> Eno, > > >> >> >> >>>> > > >> >> >> >>>> Thanks for the comments. > > >> >> >> >>>> > > >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the > > >> restore > > >> >> >> method > > >> >> >> >>> for > > >> >> >> >>>> backward compatibility as that is what is used by current > > >> >> >> implementing > > >> >> >> >>>> classes. However as I think about it makes things cleaner > to > > >> have > > >> >> a > > >> >> >> >>> single > > >> >> >> >>>> restore method taking a collection. I'll wait for others to > > >> weigh > > >> >> in, > > >> >> >> >>> but > > >> >> >> >>>> I'm leaning towards having a single restore method. > > >> >> >> >>>> > > >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the > > >> >> restore > > >> >> >> >>> process > > >> >> >> >>>> as we load records from each poll request. > > >> >> >> >>>> > > >> >> >> >>>> For example if the change log contained 5000 records and > > >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" > method > > >> >> would > > >> >> >> get > > >> >> >> >>>> called 5 times each time with the ending offset of the last > > >> >> record in > > >> >> >> >>> the > > >> >> >> >>>> batch and the count of the batch. I'll update the KIP > to > > >> add > > >> >> >> >>> comments > > >> >> >> >>>> above the interface methods. > > >> >> >> >>>> > > >> >> >> >>>> > > >> >> >> >>>> Thanks, > > >> >> >> >>>> Bill > > >> >> >> >>>> > > >> >> >> >>>> > > >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska < > > >> >> >> eno.there...@gmail.com> > > >> >> >> >>>> wrote: > > >> >> >> >>>> > > >> >> >> >>>>> Thanks Bill, > > >> >> >> >>>>> > > >> >> >> >>>>> A couple of questions: > > >> >> >> >>>>> > > >> >> >> >>>>> > > >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we > > >> just > > >> >> have > > >> >> >> >>> one, > > >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there > cases > > >> when > > >> >> >> >>> people > > >> >> >> >>>>> want to restore one at a time? In that case, they could > > still > > >> use > > >> >> >> >>>>> restoreAll with just 1 record in the collection right? > > >> >> >> >>>>> > > >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a > > small > > >> >> >> comment > > >> >> >> >>> on > > >> >> >> >>>>> top of all three methods. An example might help here. > > >> >> >> >>>>> > > >> >> >> >>>>> Thanks > > >> >> >> >>>>> Eno > > >> >> >> >>>>> > > >> >> >> >>>>> > > >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> > > >> wrote: > > >> >> >> >>>>>> > > >> >> >> >>>>>> Guozhang, Damian thanks for the comments. > > >> >> >> >>>>>> > > >> >> >> >>>>>> Giving developers the ability to hook into StateStore > > >> recovery > > >> >> >> >>> phases > > >> >> >> >>>> was > > >> >> >> >>>>>> part of my original intent. However the state the KIP is > in > > >> now > > >> >> >> >>> won't > > >> >> >> >>>>>> provide this functionality. > > >> >> >> >>>>>> > > >> >> >> >>>>>> As a result I'll be doing a significant revision of > > KIP-167. > > >> >> I'll > > >> >> >> >>> be > > >> >> >> >>>>> sure > > >> >> >> >>>>>> to incorporate all your comments in the new revision. > > >> >> >> >>>>>> > > >> >> >> >>>>>> Thanks, > > >> >> >> >>>>>> Bill > > >> >> >> >>>>>> > > >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy < > > >> >> damian....@gmail.com> > > >> >> >> >>>> wrote: > > >> >> >> >>>>>> > > >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has > suggested, > > >> >> i.e., > > >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and > > >> also > > >> >> need > > >> >> >> >>> to > > >> >> >> >>>>> have > > >> >> >> >>>>>>> the end offset available such that people can use it > > derive > > >> >> >> >>> progress. > > >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext > > interface > > >> >> could > > >> >> >> >>> be: > > >> >> >> >>>>>>> > > >> >> >> >>>>>>> long beginOffset() > > >> >> >> >>>>>>> long endOffset() > > >> >> >> >>>>>>> long currentOffset() > > >> >> >> >>>>>>> > > >> >> >> >>>>>>> One further thing, this currently doesn't provide > > developers > > >> >> the > > >> >> >> >>>>> ability to > > >> >> >> >>>>>>> hook into this information if they are using the > built-in > > >> >> >> >>> StateStores. > > >> >> >> >>>>> Is > > >> >> >> >>>>>>> this something we should be considering? > > >> >> >> >>>>>>> > > >> >> >> >>>>>>> > > >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang < > > >> wangg...@gmail.com> > > >> >> >> >>> wrote: > > >> >> >> >>>>>>> > > >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of > > >> comments: > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called > only > > >> once > > >> >> per > > >> >> >> >>>> store > > >> >> >> >>>>>>>> throughout the whole restoration process, and > restoreAll > > is > > >> >> >> called > > >> >> >> >>>> per > > >> >> >> >>>>>>>> batch. In that case I feel we can set the > > >> StateRestoreContext > > >> >> as > > >> >> >> a > > >> >> >> >>>>> second > > >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and > > let > > >> the > > >> >> >> >>>> library > > >> >> >> >>>>> to > > >> >> >> >>>>>>>> set the corresponding values instead and only let users > > to > > >> >> read > > >> >> >> >>>> (since > > >> >> >> >>>>>>> the > > >> >> >> >>>>>>>> collection of key-value pairs do not contain offset > > >> >> information > > >> >> >> >>>> anyways > > >> >> >> >>>>>>>> users cannot really set the offset). The > > >> "lastOffsetRestored" > > >> >> >> >>> would > > >> >> >> >>>> be > > >> >> >> >>>>>>> the > > >> >> >> >>>>>>>> starting offset when called on `beginRestore`. > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> 2) Users who wants to implement their own batch > > restoration > > >> >> >> >>> callbacks > > >> >> >> >>>>>>> would > > >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` > > while > > >> >> they > > >> >> >> >>>> either > > >> >> >> >>>>>>> let > > >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic > in > > >> >> >> >>> `restoreAll` > > >> >> >> >>>>>>> only > > >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two > > abstract > > >> >> impl > > >> >> >> >>> of > > >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore / > > >> >> >> >>> endRestore as > > >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to > > call > > >> >> >> >>> abstract > > >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw > > "not > > >> >> >> >>> supported > > >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract. > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in > > >> >> >> >>>>> StateRestoreContext, > > >> >> >> >>>>>>>> which is important for users to track the restoration > > >> progress > > >> >> >> >>> since > > >> >> >> >>>>>>>> otherwise they could not tell how many percent of > > >> restoration > > >> >> has > > >> >> >> >>>>>>>> completed. I.e.: > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends > > >> >> >> >>>>>>> StateRestoreCallback > > >> >> >> >>>>>>>> { > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> void restoreAll(Collection<KeyValue<byte[], byte > []>> > > >> >> records, > > >> >> >> >>>>>>>> StateRestoreContext > > >> >> >> >>>>>>>> restoreContext); > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> void beginRestore(StateRestoreContext > restoreContext); > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> void endRestore(StateRestoreContext restoreContext); > > >> >> >> >>>>>>>> } > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> public interface StateRestoreContext { > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> long lastOffsetRestored(); > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> long endOffsetToRestore(); > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> int numberRestored(); > > >> >> >> >>>>>>>> } > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> Guozhang > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck < > > >> >> bbej...@gmail.com> > > >> >> >> >>>> wrote: > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>>> Guozhang, Matthias, > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> Thanks for the comments. I have updated the KIP, > (JIRA > > >> title > > >> >> >> and > > >> >> >> >>>>>>>>> description as well). > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> I had thought about introducing a separate interface > > >> >> altogether, > > >> >> >> >>> but > > >> >> >> >>>>>>>>> extending the current one makes more sense. > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number > of > > >> >> >> >>> records, I > > >> >> >> >>>>>>> think > > >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of > > >> querying > > >> >> >> for > > >> >> >> >>>>>>>>> intermediate results, but it would be per batch > > restored. > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> Thanks, > > >> >> >> >>>>>>>>> Bill > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski < > > >> >> j...@jagunet.com> > > >> >> >> >>>>> wrote: > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>>>> > > >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax < > > >> >> >> >>>>>>> matth...@confluent.io> > > >> >> >> >>>>>>>>>> wrote: > > >> >> >> >>>>>>>>>>> > > >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not > > >> change > > >> >> >> the > > >> >> >> >>>>>>>> current > > >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the > > >> current > > >> >> >> >>> one. > > >> >> >> >>>>>>>>>>> > > >> >> >> >>>>>>>>>> > > >> >> >> >>>>>>>>>> ++1 > > >> >> >> >>>>>>>>>> > > >> >> >> >>>>>>>>>> > > >> >> >> >>>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>>> -- > > >> >> >> >>>>>>>> -- Guozhang > > >> >> >> >>>>>>>> > > >> >> >> >>>>>>> > > >> >> >> >>>>> > > >> >> >> >>>>> > > >> >> >> >>>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> -- > > >> >> >> >>> -- Guozhang > > >> >> >> >>> > > >> >> >> >> > > >> >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> > > > >> >> > > >> > > > >> > > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > -- Guozhang >