Thinking about this some more, I have another approach. Leave the first parameter of as String in the StateRestoreListener interface.
But we'll provide 2 default abstract classes one implementing StateRestoreCallback and the other implementing the BatchingStateRestoreCallback. Both abstract classes will also implement the StateRestoreListener interface with no-op methods provided for the restore progress methods. WDYT? On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bbej...@gmail.com> wrote: > Guozhang, > > Thanks for the comments. > > I think that will work, but my concern is it might not be as clear to > users that want to receive external notification of the restore progress > separately (say for reporting purposes) and still send separate signals to > the state store for resource management tasks. > > However I like this approach better and I have some ideas I can do in the > implementation, so I'll update the KIP accordingly. > > Thanks, > Bill > > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> More specifically, if we can replace the first parameter from the String >> store name to the store instance itself, would that be sufficient to >> cover ` >> StateRestoreNotification`? >> >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Bill, >> > >> > I'm wondering why we need the `StateRestoreNotification` while still >> > having `StateRestoreListener`, could the above setup achievable just >> with >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the >> > later can subsume any use cases intended for the former API. >> > >> > Guozhang >> > >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> wrote: >> > >> >> I'm going to update the KIP with new interface StateRestoreNotification >> >> containing two methods, startRestore and endRestore. >> >> >> >> While naming is very similar to methods already proposed on the >> >> StateRestoreListener, the intent of these methods is not for user >> >> notification of restore status. Instead these new methods are for >> >> internal >> >> use by the state store to perform any required setup and teardown work >> due >> >> to a batch restoration process. >> >> >> >> Here's one current use case: when using RocksDB we should optimize for >> a >> >> bulk load by setting Options.prepareForBulkload(). >> >> >> >> 1. If the database has already been opened, we'll need to close it, >> set >> >> the "prepareForBulkload" and re-open the database. >> >> 2. Once the restore is completed we'll need to close and re-open the >> >> database with the "prepareForBulkload" option turned off. >> >> >> >> While we are mentioning the RocksDB use case above, the addition of >> this >> >> interface is not specific to any specific implementation of a >> persistent >> >> state store. >> >> >> >> Additionally, a separate interface is needed so that any user can >> >> implement >> >> the state restore notification feature regardless of the state restore >> >> callback used. >> >> >> >> I'll also remove the "getStateRestoreListener" method and stick with >> the >> >> notion of a "global" restore listener for now. >> >> >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> >> wrote: >> >> >> >> > Yes it is, more of an oversight on my part, I'll remove it from the >> KIP. >> >> > >> >> > >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax < >> >> matth...@confluent.io> >> >> > wrote: >> >> > >> >> >> Hi, >> >> >> >> >> >> I thinks for now it's good enough to start with a single global >> restore >> >> >> listener. We can incrementally improve this later on if required. Of >> >> >> course, if it's easy to do right away we can also be more fine >> grained. >> >> >> But for KTable, we might want to add this after getting rid of all >> the >> >> >> overloads we have atm. >> >> >> >> >> >> One question: what is the purpose of parameter "endOffset" in >> >> >> #onRestoreEnd() -- isn't this the same value as provided in >> >> >> #onRestoreStart() ? >> >> >> >> >> >> >> >> >> -Matthias >> >> >> >> >> >> >> >> >> >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote: >> >> >> > Thinking about the custom StateRestoreListener approach and >> having a >> >> get >> >> >> > method on the interface will really only work for custom state >> >> stores. >> >> >> > >> >> >> > So we'll need to provide another way for users to set behavior >> with >> >> >> > provided state stores. The only option that comes to mind now is >> >> also >> >> >> > adding a parameter to the StateStoreSupplier. >> >> >> > >> >> >> > >> >> >> > Bill >> >> >> > >> >> >> > >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com> >> >> wrote: >> >> >> > >> >> >> >> Guozhang, >> >> >> >> >> >> >> >> Thanks for the comments. >> >> >> >> >> >> >> >> 1. As for the granularity, I agree that having one global >> >> >> >> StateRestoreListener could be restrictive. But I think it's >> >> important >> >> >> to >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows >> >> users >> >> >> to >> >> >> >> define an anonymous instance that has access to local scope for >> >> >> reporting >> >> >> >> purposes. This is a similar pattern we use for >> >> >> >> KafkaStreams.setStateListener. >> >> >> >> >> >> >> >> As an alternative, what if we add a method to the >> >> >> BatchingStateRestoreCallback >> >> >> >> interface named "getStateStoreListener". Then in an abstract >> >> adapter >> >> >> >> class we return null from getStateStoreListener. But if users >> >> want to >> >> >> >> supply a different StateRestoreListener strategy per callback >> they >> >> >> would >> >> >> >> simply override the method to return an actual instance. >> >> >> >> >> >> >> >> WDYT? >> >> >> >> >> >> >> >> 2. I'll make the required updates to pass in the ending offset >> at >> >> the >> >> >> >> start as well as the actual name of the state store. >> >> >> >> >> >> >> >> Bill >> >> >> >> >> >> >> >> >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang < >> wangg...@gmail.com> >> >> >> wrote: >> >> >> >> >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more >> comments: >> >> >> >>> >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity >> may >> >> >> not be >> >> >> >>> sufficient, as in the listener callback we do not which store >> it is >> >> >> >>> restoring right now: if the topic is a changelog topic then from >> >> the >> >> >> >>> `TopicPartition` we may be able to infer the state store name, >> but >> >> if >> >> >> the >> >> >> >>> topic is the source topic read as a KTable then we may not know >> >> which >> >> >> >>> store >> >> >> >>> it is restoring right now; plus forcing users to infer the state >> >> store >> >> >> >>> name >> >> >> >>> from the topic partition name would not be intuitive as well. >> Plus >> >> for >> >> >> >>> different stores the listener may be implemented differently, >> and >> >> >> setting >> >> >> >>> a >> >> >> >>> global listener would force users to branch on the >> topic-partition >> >> >> names, >> >> >> >>> similarly to what we did in the global timestamp extractor. On >> the >> >> >> other >> >> >> >>> hand, I also agree that setting the listener on the per-store >> >> >> granularity >> >> >> >>> may be a bit cumbersome since if users want to override it on a >> >> >> specific >> >> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. >> So >> >> >> would >> >> >> >>> love to hear other people's opinions. >> >> >> >>> >> >> >> >>> If we think that different implemented restoring callback may be >> >> less >> >> >> >>> common, then I'd suggest at least replace the `TopicPartition` >> >> >> parameter >> >> >> >>> with the `String` store name and the `TaskId`? >> >> >> >>> >> >> >> >>> 2. I think we can pass in the `long endOffset` in the >> >> `onRestoreStart` >> >> >> >>> function as well, as we will have read the endOffset already by >> >> then; >> >> >> >>> otherwise users can still not be able to track the restoration >> >> >> progress >> >> >> >>> (e.g. how much percentage I have been restoring so far, to >> estimate >> >> >> on how >> >> >> >>> long I still need to wait). >> >> >> >>> >> >> >> >>> >> >> >> >>> Guozhang >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck < >> bbej...@gmail.com> >> >> >> wrote: >> >> >> >>> >> >> >> >>>> Eno, >> >> >> >>>> >> >> >> >>>> Thanks for the comments. >> >> >> >>>> >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the >> restore >> >> >> method >> >> >> >>> for >> >> >> >>>> backward compatibility as that is what is used by current >> >> >> implementing >> >> >> >>>> classes. However as I think about it makes things cleaner to >> have >> >> a >> >> >> >>> single >> >> >> >>>> restore method taking a collection. I'll wait for others to >> weigh >> >> in, >> >> >> >>> but >> >> >> >>>> I'm leaning towards having a single restore method. >> >> >> >>>> >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the >> >> restore >> >> >> >>> process >> >> >> >>>> as we load records from each poll request. >> >> >> >>>> >> >> >> >>>> For example if the change log contained 5000 records and >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method >> >> would >> >> >> get >> >> >> >>>> called 5 times each time with the ending offset of the last >> >> record in >> >> >> >>> the >> >> >> >>>> batch and the count of the batch. I'll update the KIP to >> add >> >> >> >>> comments >> >> >> >>>> above the interface methods. >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> Thanks, >> >> >> >>>> Bill >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska < >> >> >> eno.there...@gmail.com> >> >> >> >>>> wrote: >> >> >> >>>> >> >> >> >>>>> Thanks Bill, >> >> >> >>>>> >> >> >> >>>>> A couple of questions: >> >> >> >>>>> >> >> >> >>>>> >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we >> just >> >> have >> >> >> >>> one, >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases >> when >> >> >> >>> people >> >> >> >>>>> want to restore one at a time? In that case, they could still >> use >> >> >> >>>>> restoreAll with just 1 record in the collection right? >> >> >> >>>>> >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small >> >> >> comment >> >> >> >>> on >> >> >> >>>>> top of all three methods. An example might help here. >> >> >> >>>>> >> >> >> >>>>> Thanks >> >> >> >>>>> Eno >> >> >> >>>>> >> >> >> >>>>> >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> >> wrote: >> >> >> >>>>>> >> >> >> >>>>>> Guozhang, Damian thanks for the comments. >> >> >> >>>>>> >> >> >> >>>>>> Giving developers the ability to hook into StateStore >> recovery >> >> >> >>> phases >> >> >> >>>> was >> >> >> >>>>>> part of my original intent. However the state the KIP is in >> now >> >> >> >>> won't >> >> >> >>>>>> provide this functionality. >> >> >> >>>>>> >> >> >> >>>>>> As a result I'll be doing a significant revision of KIP-167. >> >> I'll >> >> >> >>> be >> >> >> >>>>> sure >> >> >> >>>>>> to incorporate all your comments in the new revision. >> >> >> >>>>>> >> >> >> >>>>>> Thanks, >> >> >> >>>>>> Bill >> >> >> >>>>>> >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy < >> >> damian....@gmail.com> >> >> >> >>>> wrote: >> >> >> >>>>>> >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested, >> >> i.e., >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and >> also >> >> need >> >> >> >>> to >> >> >> >>>>> have >> >> >> >>>>>>> the end offset available such that people can use it derive >> >> >> >>> progress. >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface >> >> could >> >> >> >>> be: >> >> >> >>>>>>> >> >> >> >>>>>>> long beginOffset() >> >> >> >>>>>>> long endOffset() >> >> >> >>>>>>> long currentOffset() >> >> >> >>>>>>> >> >> >> >>>>>>> One further thing, this currently doesn't provide developers >> >> the >> >> >> >>>>> ability to >> >> >> >>>>>>> hook into this information if they are using the built-in >> >> >> >>> StateStores. >> >> >> >>>>> Is >> >> >> >>>>>>> this something we should be considering? >> >> >> >>>>>>> >> >> >> >>>>>>> >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang < >> wangg...@gmail.com> >> >> >> >>> wrote: >> >> >> >>>>>>> >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of >> comments: >> >> >> >>>>>>>> >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only >> once >> >> per >> >> >> >>>> store >> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll is >> >> >> called >> >> >> >>>> per >> >> >> >>>>>>>> batch. In that case I feel we can set the >> StateRestoreContext >> >> as >> >> >> a >> >> >> >>>>> second >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let >> the >> >> >> >>>> library >> >> >> >>>>> to >> >> >> >>>>>>>> set the corresponding values instead and only let users to >> >> read >> >> >> >>>> (since >> >> >> >>>>>>> the >> >> >> >>>>>>>> collection of key-value pairs do not contain offset >> >> information >> >> >> >>>> anyways >> >> >> >>>>>>>> users cannot really set the offset). The >> "lastOffsetRestored" >> >> >> >>> would >> >> >> >>>> be >> >> >> >>>>>>> the >> >> >> >>>>>>>> starting offset when called on `beginRestore`. >> >> >> >>>>>>>> >> >> >> >>>>>>>> 2) Users who wants to implement their own batch restoration >> >> >> >>> callbacks >> >> >> >>>>>>> would >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while >> >> they >> >> >> >>>> either >> >> >> >>>>>>> let >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in >> >> >> >>> `restoreAll` >> >> >> >>>>>>> only >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract >> >> impl >> >> >> >>> of >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore / >> >> >> >>> endRestore as >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call >> >> >> >>> abstract >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw "not >> >> >> >>> supported >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract. >> >> >> >>>>>>>> >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in >> >> >> >>>>> StateRestoreContext, >> >> >> >>>>>>>> which is important for users to track the restoration >> progress >> >> >> >>> since >> >> >> >>>>>>>> otherwise they could not tell how many percent of >> restoration >> >> has >> >> >> >>>>>>>> completed. I.e.: >> >> >> >>>>>>>> >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends >> >> >> >>>>>>> StateRestoreCallback >> >> >> >>>>>>>> { >> >> >> >>>>>>>> >> >> >> >>>>>>>> void restoreAll(Collection<KeyValue<byte[], byte []>> >> >> records, >> >> >> >>>>>>>> StateRestoreContext >> >> >> >>>>>>>> restoreContext); >> >> >> >>>>>>>> >> >> >> >>>>>>>> void beginRestore(StateRestoreContext restoreContext); >> >> >> >>>>>>>> >> >> >> >>>>>>>> void endRestore(StateRestoreContext restoreContext); >> >> >> >>>>>>>> } >> >> >> >>>>>>>> >> >> >> >>>>>>>> public interface StateRestoreContext { >> >> >> >>>>>>>> >> >> >> >>>>>>>> long lastOffsetRestored(); >> >> >> >>>>>>>> >> >> >> >>>>>>>> long endOffsetToRestore(); >> >> >> >>>>>>>> >> >> >> >>>>>>>> int numberRestored(); >> >> >> >>>>>>>> } >> >> >> >>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> Guozhang >> >> >> >>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck < >> >> bbej...@gmail.com> >> >> >> >>>> wrote: >> >> >> >>>>>>>> >> >> >> >>>>>>>>> Guozhang, Matthias, >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> Thanks for the comments. I have updated the KIP, (JIRA >> title >> >> >> and >> >> >> >>>>>>>>> description as well). >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> I had thought about introducing a separate interface >> >> altogether, >> >> >> >>> but >> >> >> >>>>>>>>> extending the current one makes more sense. >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of >> >> >> >>> records, I >> >> >> >>>>>>> think >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of >> querying >> >> >> for >> >> >> >>>>>>>>> intermediate results, but it would be per batch restored. >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> Thanks, >> >> >> >>>>>>>>> Bill >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski < >> >> j...@jagunet.com> >> >> >> >>>>> wrote: >> >> >> >>>>>>>>> >> >> >> >>>>>>>>>> >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax < >> >> >> >>>>>>> matth...@confluent.io> >> >> >> >>>>>>>>>> wrote: >> >> >> >>>>>>>>>>> >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not >> change >> >> >> the >> >> >> >>>>>>>> current >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the >> current >> >> >> >>> one. >> >> >> >>>>>>>>>>> >> >> >> >>>>>>>>>> >> >> >> >>>>>>>>>> ++1 >> >> >> >>>>>>>>>> >> >> >> >>>>>>>>>> >> >> >> >>>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> >> >> >> >>>>>>>> -- >> >> >> >>>>>>>> -- Guozhang >> >> >> >>>>>>>> >> >> >> >>>>>>> >> >> >> >>>>> >> >> >> >>>>> >> >> >> >>>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> -- >> >> >> >>> -- Guozhang >> >> >> >>> >> >> >> >> >> >> >> >> >> >> >> > >> >> >> >> >> >> >> >> > >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> >> -- >> -- Guozhang >> > >