More specifically, if we can replace the first parameter from the String store name to the store instance itself, would that be sufficient to cover ` StateRestoreNotification`?
On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Bill, > > I'm wondering why we need the `StateRestoreNotification` while still > having `StateRestoreListener`, could the above setup achievable just with > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the > later can subsume any use cases intended for the former API. > > Guozhang > > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> wrote: > >> I'm going to update the KIP with new interface StateRestoreNotification >> containing two methods, startRestore and endRestore. >> >> While naming is very similar to methods already proposed on the >> StateRestoreListener, the intent of these methods is not for user >> notification of restore status. Instead these new methods are for >> internal >> use by the state store to perform any required setup and teardown work due >> to a batch restoration process. >> >> Here's one current use case: when using RocksDB we should optimize for a >> bulk load by setting Options.prepareForBulkload(). >> >> 1. If the database has already been opened, we'll need to close it, set >> the "prepareForBulkload" and re-open the database. >> 2. Once the restore is completed we'll need to close and re-open the >> database with the "prepareForBulkload" option turned off. >> >> While we are mentioning the RocksDB use case above, the addition of this >> interface is not specific to any specific implementation of a persistent >> state store. >> >> Additionally, a separate interface is needed so that any user can >> implement >> the state restore notification feature regardless of the state restore >> callback used. >> >> I'll also remove the "getStateRestoreListener" method and stick with the >> notion of a "global" restore listener for now. >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> wrote: >> >> > Yes it is, more of an oversight on my part, I'll remove it from the KIP. >> > >> > >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax < >> matth...@confluent.io> >> > wrote: >> > >> >> Hi, >> >> >> >> I thinks for now it's good enough to start with a single global restore >> >> listener. We can incrementally improve this later on if required. Of >> >> course, if it's easy to do right away we can also be more fine grained. >> >> But for KTable, we might want to add this after getting rid of all the >> >> overloads we have atm. >> >> >> >> One question: what is the purpose of parameter "endOffset" in >> >> #onRestoreEnd() -- isn't this the same value as provided in >> >> #onRestoreStart() ? >> >> >> >> >> >> -Matthias >> >> >> >> >> >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote: >> >> > Thinking about the custom StateRestoreListener approach and having a >> get >> >> > method on the interface will really only work for custom state >> stores. >> >> > >> >> > So we'll need to provide another way for users to set behavior with >> >> > provided state stores. The only option that comes to mind now is >> also >> >> > adding a parameter to the StateStoreSupplier. >> >> > >> >> > >> >> > Bill >> >> > >> >> > >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com> >> wrote: >> >> > >> >> >> Guozhang, >> >> >> >> >> >> Thanks for the comments. >> >> >> >> >> >> 1. As for the granularity, I agree that having one global >> >> >> StateRestoreListener could be restrictive. But I think it's >> important >> >> to >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows >> users >> >> to >> >> >> define an anonymous instance that has access to local scope for >> >> reporting >> >> >> purposes. This is a similar pattern we use for >> >> >> KafkaStreams.setStateListener. >> >> >> >> >> >> As an alternative, what if we add a method to the >> >> BatchingStateRestoreCallback >> >> >> interface named "getStateStoreListener". Then in an abstract >> adapter >> >> >> class we return null from getStateStoreListener. But if users >> want to >> >> >> supply a different StateRestoreListener strategy per callback they >> >> would >> >> >> simply override the method to return an actual instance. >> >> >> >> >> >> WDYT? >> >> >> >> >> >> 2. I'll make the required updates to pass in the ending offset at >> the >> >> >> start as well as the actual name of the state store. >> >> >> >> >> >> Bill >> >> >> >> >> >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wangg...@gmail.com> >> >> wrote: >> >> >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more comments: >> >> >>> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may >> >> not be >> >> >>> sufficient, as in the listener callback we do not which store it is >> >> >>> restoring right now: if the topic is a changelog topic then from >> the >> >> >>> `TopicPartition` we may be able to infer the state store name, but >> if >> >> the >> >> >>> topic is the source topic read as a KTable then we may not know >> which >> >> >>> store >> >> >>> it is restoring right now; plus forcing users to infer the state >> store >> >> >>> name >> >> >>> from the topic partition name would not be intuitive as well. Plus >> for >> >> >>> different stores the listener may be implemented differently, and >> >> setting >> >> >>> a >> >> >>> global listener would force users to branch on the topic-partition >> >> names, >> >> >>> similarly to what we did in the global timestamp extractor. On the >> >> other >> >> >>> hand, I also agree that setting the listener on the per-store >> >> granularity >> >> >>> may be a bit cumbersome since if users want to override it on a >> >> specific >> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So >> >> would >> >> >>> love to hear other people's opinions. >> >> >>> >> >> >>> If we think that different implemented restoring callback may be >> less >> >> >>> common, then I'd suggest at least replace the `TopicPartition` >> >> parameter >> >> >>> with the `String` store name and the `TaskId`? >> >> >>> >> >> >>> 2. I think we can pass in the `long endOffset` in the >> `onRestoreStart` >> >> >>> function as well, as we will have read the endOffset already by >> then; >> >> >>> otherwise users can still not be able to track the restoration >> >> progress >> >> >>> (e.g. how much percentage I have been restoring so far, to estimate >> >> on how >> >> >>> long I still need to wait). >> >> >>> >> >> >>> >> >> >>> Guozhang >> >> >>> >> >> >>> >> >> >>> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbej...@gmail.com> >> >> wrote: >> >> >>> >> >> >>>> Eno, >> >> >>>> >> >> >>>> Thanks for the comments. >> >> >>>> >> >> >>>> 1. As for having both restore and restoreAll, I kept the restore >> >> method >> >> >>> for >> >> >>>> backward compatibility as that is what is used by current >> >> implementing >> >> >>>> classes. However as I think about it makes things cleaner to have >> a >> >> >>> single >> >> >>>> restore method taking a collection. I'll wait for others to weigh >> in, >> >> >>> but >> >> >>>> I'm leaning towards having a single restore method. >> >> >>>> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the >> restore >> >> >>> process >> >> >>>> as we load records from each poll request. >> >> >>>> >> >> >>>> For example if the change log contained 5000 records and >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method >> would >> >> get >> >> >>>> called 5 times each time with the ending offset of the last >> record in >> >> >>> the >> >> >>>> batch and the count of the batch. I'll update the KIP to add >> >> >>> comments >> >> >>>> above the interface methods. >> >> >>>> >> >> >>>> >> >> >>>> Thanks, >> >> >>>> Bill >> >> >>>> >> >> >>>> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska < >> >> eno.there...@gmail.com> >> >> >>>> wrote: >> >> >>>> >> >> >>>>> Thanks Bill, >> >> >>>>> >> >> >>>>> A couple of questions: >> >> >>>>> >> >> >>>>> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we just >> have >> >> >>> one, >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases when >> >> >>> people >> >> >>>>> want to restore one at a time? In that case, they could still use >> >> >>>>> restoreAll with just 1 record in the collection right? >> >> >>>>> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small >> >> comment >> >> >>> on >> >> >>>>> top of all three methods. An example might help here. >> >> >>>>> >> >> >>>>> Thanks >> >> >>>>> Eno >> >> >>>>> >> >> >>>>> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> wrote: >> >> >>>>>> >> >> >>>>>> Guozhang, Damian thanks for the comments. >> >> >>>>>> >> >> >>>>>> Giving developers the ability to hook into StateStore recovery >> >> >>> phases >> >> >>>> was >> >> >>>>>> part of my original intent. However the state the KIP is in now >> >> >>> won't >> >> >>>>>> provide this functionality. >> >> >>>>>> >> >> >>>>>> As a result I'll be doing a significant revision of KIP-167. >> I'll >> >> >>> be >> >> >>>>> sure >> >> >>>>>> to incorporate all your comments in the new revision. >> >> >>>>>> >> >> >>>>>> Thanks, >> >> >>>>>> Bill >> >> >>>>>> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy < >> damian....@gmail.com> >> >> >>>> wrote: >> >> >>>>>> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested, >> i.e., >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also >> need >> >> >>> to >> >> >>>>> have >> >> >>>>>>> the end offset available such that people can use it derive >> >> >>> progress. >> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface >> could >> >> >>> be: >> >> >>>>>>> >> >> >>>>>>> long beginOffset() >> >> >>>>>>> long endOffset() >> >> >>>>>>> long currentOffset() >> >> >>>>>>> >> >> >>>>>>> One further thing, this currently doesn't provide developers >> the >> >> >>>>> ability to >> >> >>>>>>> hook into this information if they are using the built-in >> >> >>> StateStores. >> >> >>>>> Is >> >> >>>>>>> this something we should be considering? >> >> >>>>>>> >> >> >>>>>>> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wangg...@gmail.com> >> >> >>> wrote: >> >> >>>>>>> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments: >> >> >>>>>>>> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once >> per >> >> >>>> store >> >> >>>>>>>> throughout the whole restoration process, and restoreAll is >> >> called >> >> >>>> per >> >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext >> as >> >> a >> >> >>>>> second >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the >> >> >>>> library >> >> >>>>> to >> >> >>>>>>>> set the corresponding values instead and only let users to >> read >> >> >>>> (since >> >> >>>>>>> the >> >> >>>>>>>> collection of key-value pairs do not contain offset >> information >> >> >>>> anyways >> >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored" >> >> >>> would >> >> >>>> be >> >> >>>>>>> the >> >> >>>>>>>> starting offset when called on `beginRestore`. >> >> >>>>>>>> >> >> >>>>>>>> 2) Users who wants to implement their own batch restoration >> >> >>> callbacks >> >> >>>>>>> would >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while >> they >> >> >>>> either >> >> >>>>>>> let >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in >> >> >>> `restoreAll` >> >> >>>>>>> only >> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract >> impl >> >> >>> of >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore / >> >> >>> endRestore as >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call >> >> >>> abstract >> >> >>>>>>>> `restore` while the other implement `restore` to throw "not >> >> >>> supported >> >> >>>>>>>> exception" and keep `restoreAll` abstract. >> >> >>>>>>>> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in >> >> >>>>> StateRestoreContext, >> >> >>>>>>>> which is important for users to track the restoration progress >> >> >>> since >> >> >>>>>>>> otherwise they could not tell how many percent of restoration >> has >> >> >>>>>>>> completed. I.e.: >> >> >>>>>>>> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends >> >> >>>>>>> StateRestoreCallback >> >> >>>>>>>> { >> >> >>>>>>>> >> >> >>>>>>>> void restoreAll(Collection<KeyValue<byte[], byte []>> >> records, >> >> >>>>>>>> StateRestoreContext >> >> >>>>>>>> restoreContext); >> >> >>>>>>>> >> >> >>>>>>>> void beginRestore(StateRestoreContext restoreContext); >> >> >>>>>>>> >> >> >>>>>>>> void endRestore(StateRestoreContext restoreContext); >> >> >>>>>>>> } >> >> >>>>>>>> >> >> >>>>>>>> public interface StateRestoreContext { >> >> >>>>>>>> >> >> >>>>>>>> long lastOffsetRestored(); >> >> >>>>>>>> >> >> >>>>>>>> long endOffsetToRestore(); >> >> >>>>>>>> >> >> >>>>>>>> int numberRestored(); >> >> >>>>>>>> } >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> Guozhang >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck < >> bbej...@gmail.com> >> >> >>>> wrote: >> >> >>>>>>>> >> >> >>>>>>>>> Guozhang, Matthias, >> >> >>>>>>>>> >> >> >>>>>>>>> Thanks for the comments. I have updated the KIP, (JIRA title >> >> and >> >> >>>>>>>>> description as well). >> >> >>>>>>>>> >> >> >>>>>>>>> I had thought about introducing a separate interface >> altogether, >> >> >>> but >> >> >>>>>>>>> extending the current one makes more sense. >> >> >>>>>>>>> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of >> >> >>> records, I >> >> >>>>>>> think >> >> >>>>>>>>> the latest update to the KIP addresses this point of querying >> >> for >> >> >>>>>>>>> intermediate results, but it would be per batch restored. >> >> >>>>>>>>> >> >> >>>>>>>>> Thanks, >> >> >>>>>>>>> Bill >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski < >> j...@jagunet.com> >> >> >>>>> wrote: >> >> >>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax < >> >> >>>>>>> matth...@confluent.io> >> >> >>>>>>>>>> wrote: >> >> >>>>>>>>>>> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not change >> >> the >> >> >>>>>>>> current >> >> >>>>>>>>>>> interface, but add a new interface that extends the current >> >> >>> one. >> >> >>>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>> ++1 >> >> >>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> -- >> >> >>>>>>>> -- Guozhang >> >> >>>>>>>> >> >> >>>>>>> >> >> >>>>> >> >> >>>>> >> >> >>>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> -- >> >> >>> -- Guozhang >> >> >>> >> >> >> >> >> >> >> >> > >> >> >> >> >> > >> > > > > -- > -- Guozhang > -- -- Guozhang