Hmm... I'm not sure how this can achieve "closing the store before
restoration, re-open it with a slight different config, and then
close-and-reopen store for query" pattern? You need to be able to access
the store object in order to do this right?


Guozhang

On Mon, Jun 26, 2017 at 7:40 AM, Bill Bejeck <bbej...@gmail.com> wrote:

> Thinking about this some more, I have another approach.  Leave the first
> parameter of as String in the StateRestoreListener interface.
>
> But we'll provide 2 default abstract classes one implementing
> StateRestoreCallback and the other implementing the
> BatchingStateRestoreCallback.  Both abstract classes will also implement
> the StateRestoreListener interface with no-op methods provided for the
> restore progress methods.
>
> WDYT?
>
> On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Guozhang,
> >
> > Thanks for the comments.
> >
> > I think that will work, but my concern is it might not be as clear to
> > users that want to receive external notification of the restore progress
> > separately (say for reporting purposes) and still send separate signals
> to
> > the state store for resource management tasks.
> >
> > However I like this approach better and I have some ideas I can do in the
> > implementation, so I'll update the KIP accordingly.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> >> More specifically, if we can replace the first parameter from the String
> >> store name to the store instance itself, would that be sufficient to
> >> cover `
> >> StateRestoreNotification`?
> >>
> >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >> > Bill,
> >> >
> >> > I'm wondering why we need the `StateRestoreNotification` while still
> >> > having `StateRestoreListener`, could the above setup achievable just
> >> with
> >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems
> the
> >> > later can subsume any use cases intended for the former API.
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com>
> wrote:
> >> >
> >> >> I'm going to update the KIP with new interface
> StateRestoreNotification
> >> >> containing two methods, startRestore and endRestore.
> >> >>
> >> >> While naming is very similar to methods already proposed on the
> >> >> StateRestoreListener, the intent of these methods is not for user
> >> >> notification of restore status.  Instead these new methods are for
> >> >> internal
> >> >> use by the state store to perform any required setup and teardown
> work
> >> due
> >> >> to a batch restoration process.
> >> >>
> >> >> Here's one current use case: when using RocksDB we should optimize
> for
> >> a
> >> >> bulk load by setting Options.prepareForBulkload().
> >> >>
> >> >>    1. If the database has already been opened, we'll need to close
> it,
> >> set
> >> >>    the "prepareForBulkload" and re-open the database.
> >> >>    2. Once the restore is completed we'll need to close and re-open
> the
> >> >>    database with the "prepareForBulkload" option turned off.
> >> >>
> >> >> While we are mentioning the RocksDB use case above, the addition of
> >> this
> >> >> interface is not specific to any specific implementation of a
> >> persistent
> >> >> state store.
> >> >>
> >> >> Additionally, a separate interface is needed so that any user can
> >> >> implement
> >> >> the state restore notification feature regardless of the state
> restore
> >> >> callback used.
> >> >>
> >> >> I'll also remove the "getStateRestoreListener" method and stick with
> >> the
> >> >> notion of a "global" restore listener for now.
> >> >>
> >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com>
> >> wrote:
> >> >>
> >> >> > Yes it is, more of an oversight on my part, I'll remove it from the
> >> KIP.
> >> >> >
> >> >> >
> >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
> >> >> matth...@confluent.io>
> >> >> > wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> >> >> >> I thinks for now it's good enough to start with a single global
> >> restore
> >> >> >> listener. We can incrementally improve this later on if required.
> Of
> >> >> >> course, if it's easy to do right away we can also be more fine
> >> grained.
> >> >> >> But for KTable, we might want to add this after getting rid of all
> >> the
> >> >> >> overloads we have atm.
> >> >> >>
> >> >> >> One question: what is the purpose of parameter "endOffset" in
> >> >> >> #onRestoreEnd() -- isn't this the same value as provided in
> >> >> >> #onRestoreStart() ?
> >> >> >>
> >> >> >>
> >> >> >> -Matthias
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> >> >> >> > Thinking about the custom StateRestoreListener approach and
> >> having a
> >> >> get
> >> >> >> > method on the interface will really only work for custom state
> >> >> stores.
> >> >> >> >
> >> >> >> > So we'll need to provide another way for users to set behavior
> >> with
> >> >> >> > provided state stores.  The only option that comes to mind now
> is
> >> >> also
> >> >> >> > adding a parameter to the StateStoreSupplier.
> >> >> >> >
> >> >> >> >
> >> >> >> > Bill
> >> >> >> >
> >> >> >> >
> >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com
> >
> >> >> wrote:
> >> >> >> >
> >> >> >> >> Guozhang,
> >> >> >> >>
> >> >> >> >> Thanks for the comments.
> >> >> >> >>
> >> >> >> >> 1.  As for the granularity, I agree that having one global
> >> >> >> >> StateRestoreListener could be restrictive.  But I think it's
> >> >> important
> >> >> >> to
> >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
> >> >> users
> >> >> >> to
> >> >> >> >> define an anonymous instance that has access to local scope for
> >> >> >> reporting
> >> >> >> >> purposes.  This is a similar pattern we use for
> >> >> >> >> KafkaStreams.setStateListener.
> >> >> >> >>
> >> >> >> >> As an alternative, what if we add a method to the
> >> >> >> BatchingStateRestoreCallback
> >> >> >> >> interface named "getStateStoreListener".   Then in an abstract
> >> >> adapter
> >> >> >> >> class we return null from getStateStoreListener.   But if users
> >> >> want to
> >> >> >> >> supply a different StateRestoreListener strategy per callback
> >> they
> >> >> >> would
> >> >> >> >> simply override the method to return an actual instance.
> >> >> >> >>
> >> >> >> >> WDYT?
> >> >> >> >>
> >> >> >> >> 2.  I'll make the required updates to pass in the ending offset
> >> at
> >> >> the
> >> >> >> >> start as well as the actual name of the state store.
> >> >> >> >>
> >> >> >> >> Bill
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
> >> comments:
> >> >> >> >>>
> >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams
> granularity
> >> may
> >> >> >> not be
> >> >> >> >>> sufficient, as in the listener callback we do not which store
> >> it is
> >> >> >> >>> restoring right now: if the topic is a changelog topic then
> from
> >> >> the
> >> >> >> >>> `TopicPartition` we may be able to infer the state store name,
> >> but
> >> >> if
> >> >> >> the
> >> >> >> >>> topic is the source topic read as a KTable then we may not
> know
> >> >> which
> >> >> >> >>> store
> >> >> >> >>> it is restoring right now; plus forcing users to infer the
> state
> >> >> store
> >> >> >> >>> name
> >> >> >> >>> from the topic partition name would not be intuitive as well.
> >> Plus
> >> >> for
> >> >> >> >>> different stores the listener may be implemented differently,
> >> and
> >> >> >> setting
> >> >> >> >>> a
> >> >> >> >>> global listener would force users to branch on the
> >> topic-partition
> >> >> >> names,
> >> >> >> >>> similarly to what we did in the global timestamp extractor. On
> >> the
> >> >> >> other
> >> >> >> >>> hand, I also agree that setting the listener on the per-store
> >> >> >> granularity
> >> >> >> >>> may be a bit cumbersome since if users want to override it on
> a
> >> >> >> specific
> >> >> >> >>> store it needs to expose some APIs maybe at
> StateStoreSupplier.
> >> So
> >> >> >> would
> >> >> >> >>> love to hear other people's opinions.
> >> >> >> >>>
> >> >> >> >>> If we think that different implemented restoring callback may
> be
> >> >> less
> >> >> >> >>> common, then I'd suggest at least replace the `TopicPartition`
> >> >> >> parameter
> >> >> >> >>> with the `String` store name and the `TaskId`?
> >> >> >> >>>
> >> >> >> >>> 2. I think we can pass in the `long endOffset` in the
> >> >> `onRestoreStart`
> >> >> >> >>> function as well, as we will have read the endOffset already
> by
> >> >> then;
> >> >> >> >>> otherwise users can still not be able to track the restoration
> >> >> >> progress
> >> >> >> >>> (e.g. how much percentage I have been restoring so far, to
> >> estimate
> >> >> >> on how
> >> >> >> >>> long I still need to wait).
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Guozhang
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <
> >> bbej...@gmail.com>
> >> >> >> wrote:
> >> >> >> >>>
> >> >> >> >>>> Eno,
> >> >> >> >>>>
> >> >> >> >>>> Thanks for the comments.
> >> >> >> >>>>
> >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the
> >> restore
> >> >> >> method
> >> >> >> >>> for
> >> >> >> >>>> backward compatibility as that is what is used by current
> >> >> >> implementing
> >> >> >> >>>> classes. However as I think about it makes things cleaner to
> >> have
> >> >> a
> >> >> >> >>> single
> >> >> >> >>>> restore method taking a collection. I'll wait for others to
> >> weigh
> >> >> in,
> >> >> >> >>> but
> >> >> >> >>>> I'm leaning towards having a single restore method.
> >> >> >> >>>>
> >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
> >> >> restore
> >> >> >> >>> process
> >> >> >> >>>> as we load records from each poll request.
> >> >> >> >>>>
> >> >> >> >>>>    For example if the change log contained 5000 records and
> >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
> >> >> would
> >> >> >> get
> >> >> >> >>>> called 5 times each time with the ending offset of the last
> >> >> record in
> >> >> >> >>> the
> >> >> >> >>>> batch and the count    of the batch.   I'll update the KIP to
> >> add
> >> >> >> >>> comments
> >> >> >> >>>> above the interface methods.
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> Thanks,
> >> >> >> >>>> Bill
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> >> >> >> eno.there...@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>
> >> >> >> >>>>> Thanks Bill,
> >> >> >> >>>>>
> >> >> >> >>>>> A couple of questions:
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we
> >> just
> >> >> have
> >> >> >> >>> one,
> >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases
> >> when
> >> >> >> >>> people
> >> >> >> >>>>> want to restore one at a time? In that case, they could
> still
> >> use
> >> >> >> >>>>> restoreAll with just 1 record in the collection right?
> >> >> >> >>>>>
> >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a
> small
> >> >> >> comment
> >> >> >> >>> on
> >> >> >> >>>>> top of all three methods. An example might help here.
> >> >> >> >>>>>
> >> >> >> >>>>> Thanks
> >> >> >> >>>>> Eno
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com>
> >> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>> Guozhang, Damian thanks for the comments.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Giving developers the ability to hook into StateStore
> >> recovery
> >> >> >> >>> phases
> >> >> >> >>>> was
> >> >> >> >>>>>> part of my original intent. However the state the KIP is in
> >> now
> >> >> >> >>> won't
> >> >> >> >>>>>> provide this functionality.
> >> >> >> >>>>>>
> >> >> >> >>>>>> As a result I'll be doing a significant revision of
> KIP-167.
> >> >> I'll
> >> >> >> >>> be
> >> >> >> >>>>> sure
> >> >> >> >>>>>> to incorporate all your comments in the new revision.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Thanks,
> >> >> >> >>>>>> Bill
> >> >> >> >>>>>>
> >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
> >> >> damian....@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
> >> >> i.e.,
> >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and
> >> also
> >> >> need
> >> >> >> >>> to
> >> >> >> >>>>> have
> >> >> >> >>>>>>> the end offset available such that people can use it
> derive
> >> >> >> >>> progress.
> >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext
> interface
> >> >> could
> >> >> >> >>> be:
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> long beginOffset()
> >> >> >> >>>>>>> long endOffset()
> >> >> >> >>>>>>> long currentOffset()
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> One further thing, this currently doesn't provide
> developers
> >> >> the
> >> >> >> >>>>> ability to
> >> >> >> >>>>>>> hook into this information if they are using the built-in
> >> >> >> >>> StateStores.
> >> >> >> >>>>> Is
> >> >> >> >>>>>>> this something we should be considering?
> >> >> >> >>>>>>>
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
> >> wangg...@gmail.com>
> >> >> >> >>> wrote:
> >> >> >> >>>>>>>
> >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
> >> comments:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only
> >> once
> >> >> per
> >> >> >> >>>> store
> >> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll
> is
> >> >> >> called
> >> >> >> >>>> per
> >> >> >> >>>>>>>> batch. In that case I feel we can set the
> >> StateRestoreContext
> >> >> as
> >> >> >> a
> >> >> >> >>>>> second
> >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and
> let
> >> the
> >> >> >> >>>> library
> >> >> >> >>>>> to
> >> >> >> >>>>>>>> set the corresponding values instead and only let users
> to
> >> >> read
> >> >> >> >>>> (since
> >> >> >> >>>>>>> the
> >> >> >> >>>>>>>> collection of key-value pairs do not contain offset
> >> >> information
> >> >> >> >>>> anyways
> >> >> >> >>>>>>>> users cannot really set the offset). The
> >> "lastOffsetRestored"
> >> >> >> >>> would
> >> >> >> >>>> be
> >> >> >> >>>>>>> the
> >> >> >> >>>>>>>> starting offset when called on `beginRestore`.
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 2) Users who wants to implement their own batch
> restoration
> >> >> >> >>> callbacks
> >> >> >> >>>>>>> would
> >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll`
> while
> >> >> they
> >> >> >> >>>> either
> >> >> >> >>>>>>> let
> >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
> >> >> >> >>> `restoreAll`
> >> >> >> >>>>>>> only
> >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two
> abstract
> >> >> impl
> >> >> >> >>> of
> >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> >> >> >> >>> endRestore as
> >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to
> call
> >> >> >> >>> abstract
> >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw
> "not
> >> >> >> >>> supported
> >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
> >> >> >> >>>>> StateRestoreContext,
> >> >> >> >>>>>>>> which is important for users to track the restoration
> >> progress
> >> >> >> >>> since
> >> >> >> >>>>>>>> otherwise they could not tell how many percent of
> >> restoration
> >> >> has
> >> >> >> >>>>>>>> completed.  I.e.:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
> >> >> >> >>>>>>> StateRestoreCallback
> >> >> >> >>>>>>>> {
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
> >> >> records,
> >> >> >> >>>>>>>> StateRestoreContext
> >> >> >> >>>>>>>> restoreContext);
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >> >> >> >>>>>>>> }
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> public interface StateRestoreContext {
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  long lastOffsetRestored();
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  long endOffsetToRestore();
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  int numberRestored();
> >> >> >> >>>>>>>> }
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> Guozhang
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
> >> >> bbej...@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>> Guozhang, Matthias,
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA
> >> title
> >> >> >> and
> >> >> >> >>>>>>>>> description as well).
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> I had thought about introducing a separate interface
> >> >> altogether,
> >> >> >> >>> but
> >> >> >> >>>>>>>>> extending the current one makes more sense.
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of
> >> >> >> >>> records, I
> >> >> >> >>>>>>> think
> >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
> >> querying
> >> >> >> for
> >> >> >> >>>>>>>>> intermediate results, but it would be per batch
> restored.
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> Thanks,
> >> >> >> >>>>>>>>> Bill
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
> >> >> j...@jagunet.com>
> >> >> >> >>>>> wrote:
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >> >> >> >>>>>>> matth...@confluent.io>
> >> >> >> >>>>>>>>>> wrote:
> >> >> >> >>>>>>>>>>>
> >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
> >> change
> >> >> >> the
> >> >> >> >>>>>>>> current
> >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
> >> current
> >> >> >> >>> one.
> >> >> >> >>>>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>> ++1
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> --
> >> >> >> >>>>>>>> -- Guozhang
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> --
> >> >> >> >>> -- Guozhang
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to