Thinking about this some more, I have another approach.  Leave the first
parameter of as String in the StateRestoreListener interface.

But we'll provide 2 default abstract classes one implementing
StateRestoreCallback and the other implementing the
BatchingStateRestoreCallback.  Both abstract classes will also implement
the StateRestoreListener interface with no-op methods provided for the
restore progress methods.

WDYT?

On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bbej...@gmail.com> wrote:

> Guozhang,
>
> Thanks for the comments.
>
> I think that will work, but my concern is it might not be as clear to
> users that want to receive external notification of the restore progress
> separately (say for reporting purposes) and still send separate signals to
> the state store for resource management tasks.
>
> However I like this approach better and I have some ideas I can do in the
> implementation, so I'll update the KIP accordingly.
>
> Thanks,
> Bill
>
> On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>> More specifically, if we can replace the first parameter from the String
>> store name to the store instance itself, would that be sufficient to
>> cover `
>> StateRestoreNotification`?
>>
>> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Bill,
>> >
>> > I'm wondering why we need the `StateRestoreNotification` while still
>> > having `StateRestoreListener`, could the above setup achievable just
>> with
>> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
>> > later can subsume any use cases intended for the former API.
>> >
>> > Guozhang
>> >
>> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> wrote:
>> >
>> >> I'm going to update the KIP with new interface StateRestoreNotification
>> >> containing two methods, startRestore and endRestore.
>> >>
>> >> While naming is very similar to methods already proposed on the
>> >> StateRestoreListener, the intent of these methods is not for user
>> >> notification of restore status.  Instead these new methods are for
>> >> internal
>> >> use by the state store to perform any required setup and teardown work
>> due
>> >> to a batch restoration process.
>> >>
>> >> Here's one current use case: when using RocksDB we should optimize for
>> a
>> >> bulk load by setting Options.prepareForBulkload().
>> >>
>> >>    1. If the database has already been opened, we'll need to close it,
>> set
>> >>    the "prepareForBulkload" and re-open the database.
>> >>    2. Once the restore is completed we'll need to close and re-open the
>> >>    database with the "prepareForBulkload" option turned off.
>> >>
>> >> While we are mentioning the RocksDB use case above, the addition of
>> this
>> >> interface is not specific to any specific implementation of a
>> persistent
>> >> state store.
>> >>
>> >> Additionally, a separate interface is needed so that any user can
>> >> implement
>> >> the state restore notification feature regardless of the state restore
>> >> callback used.
>> >>
>> >> I'll also remove the "getStateRestoreListener" method and stick with
>> the
>> >> notion of a "global" restore listener for now.
>> >>
>> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com>
>> wrote:
>> >>
>> >> > Yes it is, more of an oversight on my part, I'll remove it from the
>> KIP.
>> >> >
>> >> >
>> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
>> >> matth...@confluent.io>
>> >> > wrote:
>> >> >
>> >> >> Hi,
>> >> >>
>> >> >> I thinks for now it's good enough to start with a single global
>> restore
>> >> >> listener. We can incrementally improve this later on if required. Of
>> >> >> course, if it's easy to do right away we can also be more fine
>> grained.
>> >> >> But for KTable, we might want to add this after getting rid of all
>> the
>> >> >> overloads we have atm.
>> >> >>
>> >> >> One question: what is the purpose of parameter "endOffset" in
>> >> >> #onRestoreEnd() -- isn't this the same value as provided in
>> >> >> #onRestoreStart() ?
>> >> >>
>> >> >>
>> >> >> -Matthias
>> >> >>
>> >> >>
>> >> >>
>> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
>> >> >> > Thinking about the custom StateRestoreListener approach and
>> having a
>> >> get
>> >> >> > method on the interface will really only work for custom state
>> >> stores.
>> >> >> >
>> >> >> > So we'll need to provide another way for users to set behavior
>> with
>> >> >> > provided state stores.  The only option that comes to mind now is
>> >> also
>> >> >> > adding a parameter to the StateStoreSupplier.
>> >> >> >
>> >> >> >
>> >> >> > Bill
>> >> >> >
>> >> >> >
>> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com>
>> >> wrote:
>> >> >> >
>> >> >> >> Guozhang,
>> >> >> >>
>> >> >> >> Thanks for the comments.
>> >> >> >>
>> >> >> >> 1.  As for the granularity, I agree that having one global
>> >> >> >> StateRestoreListener could be restrictive.  But I think it's
>> >> important
>> >> >> to
>> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
>> >> users
>> >> >> to
>> >> >> >> define an anonymous instance that has access to local scope for
>> >> >> reporting
>> >> >> >> purposes.  This is a similar pattern we use for
>> >> >> >> KafkaStreams.setStateListener.
>> >> >> >>
>> >> >> >> As an alternative, what if we add a method to the
>> >> >> BatchingStateRestoreCallback
>> >> >> >> interface named "getStateStoreListener".   Then in an abstract
>> >> adapter
>> >> >> >> class we return null from getStateStoreListener.   But if users
>> >> want to
>> >> >> >> supply a different StateRestoreListener strategy per callback
>> they
>> >> >> would
>> >> >> >> simply override the method to return an actual instance.
>> >> >> >>
>> >> >> >> WDYT?
>> >> >> >>
>> >> >> >> 2.  I'll make the required updates to pass in the ending offset
>> at
>> >> the
>> >> >> >> start as well as the actual name of the state store.
>> >> >> >>
>> >> >> >> Bill
>> >> >> >>
>> >> >> >>
>> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
>> wangg...@gmail.com>
>> >> >> wrote:
>> >> >> >>
>> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
>> comments:
>> >> >> >>>
>> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity
>> may
>> >> >> not be
>> >> >> >>> sufficient, as in the listener callback we do not which store
>> it is
>> >> >> >>> restoring right now: if the topic is a changelog topic then from
>> >> the
>> >> >> >>> `TopicPartition` we may be able to infer the state store name,
>> but
>> >> if
>> >> >> the
>> >> >> >>> topic is the source topic read as a KTable then we may not know
>> >> which
>> >> >> >>> store
>> >> >> >>> it is restoring right now; plus forcing users to infer the state
>> >> store
>> >> >> >>> name
>> >> >> >>> from the topic partition name would not be intuitive as well.
>> Plus
>> >> for
>> >> >> >>> different stores the listener may be implemented differently,
>> and
>> >> >> setting
>> >> >> >>> a
>> >> >> >>> global listener would force users to branch on the
>> topic-partition
>> >> >> names,
>> >> >> >>> similarly to what we did in the global timestamp extractor. On
>> the
>> >> >> other
>> >> >> >>> hand, I also agree that setting the listener on the per-store
>> >> >> granularity
>> >> >> >>> may be a bit cumbersome since if users want to override it on a
>> >> >> specific
>> >> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier.
>> So
>> >> >> would
>> >> >> >>> love to hear other people's opinions.
>> >> >> >>>
>> >> >> >>> If we think that different implemented restoring callback may be
>> >> less
>> >> >> >>> common, then I'd suggest at least replace the `TopicPartition`
>> >> >> parameter
>> >> >> >>> with the `String` store name and the `TaskId`?
>> >> >> >>>
>> >> >> >>> 2. I think we can pass in the `long endOffset` in the
>> >> `onRestoreStart`
>> >> >> >>> function as well, as we will have read the endOffset already by
>> >> then;
>> >> >> >>> otherwise users can still not be able to track the restoration
>> >> >> progress
>> >> >> >>> (e.g. how much percentage I have been restoring so far, to
>> estimate
>> >> >> on how
>> >> >> >>> long I still need to wait).
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Guozhang
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <
>> bbej...@gmail.com>
>> >> >> wrote:
>> >> >> >>>
>> >> >> >>>> Eno,
>> >> >> >>>>
>> >> >> >>>> Thanks for the comments.
>> >> >> >>>>
>> >> >> >>>> 1. As for having both restore and restoreAll, I kept the
>> restore
>> >> >> method
>> >> >> >>> for
>> >> >> >>>> backward compatibility as that is what is used by current
>> >> >> implementing
>> >> >> >>>> classes. However as I think about it makes things cleaner to
>> have
>> >> a
>> >> >> >>> single
>> >> >> >>>> restore method taking a collection. I'll wait for others to
>> weigh
>> >> in,
>> >> >> >>> but
>> >> >> >>>> I'm leaning towards having a single restore method.
>> >> >> >>>>
>> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
>> >> restore
>> >> >> >>> process
>> >> >> >>>> as we load records from each poll request.
>> >> >> >>>>
>> >> >> >>>>    For example if the change log contained 5000 records and
>> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
>> >> would
>> >> >> get
>> >> >> >>>> called 5 times each time with the ending offset of the last
>> >> record in
>> >> >> >>> the
>> >> >> >>>> batch and the count    of the batch.   I'll update the KIP to
>> add
>> >> >> >>> comments
>> >> >> >>>> above the interface methods.
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> Thanks,
>> >> >> >>>> Bill
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
>> >> >> eno.there...@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>
>> >> >> >>>>> Thanks Bill,
>> >> >> >>>>>
>> >> >> >>>>> A couple of questions:
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we
>> just
>> >> have
>> >> >> >>> one,
>> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases
>> when
>> >> >> >>> people
>> >> >> >>>>> want to restore one at a time? In that case, they could still
>> use
>> >> >> >>>>> restoreAll with just 1 record in the collection right?
>> >> >> >>>>>
>> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
>> >> >> comment
>> >> >> >>> on
>> >> >> >>>>> top of all three methods. An example might help here.
>> >> >> >>>>>
>> >> >> >>>>> Thanks
>> >> >> >>>>> Eno
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com>
>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>> Guozhang, Damian thanks for the comments.
>> >> >> >>>>>>
>> >> >> >>>>>> Giving developers the ability to hook into StateStore
>> recovery
>> >> >> >>> phases
>> >> >> >>>> was
>> >> >> >>>>>> part of my original intent. However the state the KIP is in
>> now
>> >> >> >>> won't
>> >> >> >>>>>> provide this functionality.
>> >> >> >>>>>>
>> >> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
>> >> I'll
>> >> >> >>> be
>> >> >> >>>>> sure
>> >> >> >>>>>> to incorporate all your comments in the new revision.
>> >> >> >>>>>>
>> >> >> >>>>>> Thanks,
>> >> >> >>>>>> Bill
>> >> >> >>>>>>
>> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
>> >> damian....@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
>> >> i.e.,
>> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and
>> also
>> >> need
>> >> >> >>> to
>> >> >> >>>>> have
>> >> >> >>>>>>> the end offset available such that people can use it derive
>> >> >> >>> progress.
>> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
>> >> could
>> >> >> >>> be:
>> >> >> >>>>>>>
>> >> >> >>>>>>> long beginOffset()
>> >> >> >>>>>>> long endOffset()
>> >> >> >>>>>>> long currentOffset()
>> >> >> >>>>>>>
>> >> >> >>>>>>> One further thing, this currently doesn't provide developers
>> >> the
>> >> >> >>>>> ability to
>> >> >> >>>>>>> hook into this information if they are using the built-in
>> >> >> >>> StateStores.
>> >> >> >>>>> Is
>> >> >> >>>>>>> this something we should be considering?
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
>> wangg...@gmail.com>
>> >> >> >>> wrote:
>> >> >> >>>>>>>
>> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
>> comments:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only
>> once
>> >> per
>> >> >> >>>> store
>> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
>> >> >> called
>> >> >> >>>> per
>> >> >> >>>>>>>> batch. In that case I feel we can set the
>> StateRestoreContext
>> >> as
>> >> >> a
>> >> >> >>>>> second
>> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let
>> the
>> >> >> >>>> library
>> >> >> >>>>> to
>> >> >> >>>>>>>> set the corresponding values instead and only let users to
>> >> read
>> >> >> >>>> (since
>> >> >> >>>>>>> the
>> >> >> >>>>>>>> collection of key-value pairs do not contain offset
>> >> information
>> >> >> >>>> anyways
>> >> >> >>>>>>>> users cannot really set the offset). The
>> "lastOffsetRestored"
>> >> >> >>> would
>> >> >> >>>> be
>> >> >> >>>>>>> the
>> >> >> >>>>>>>> starting offset when called on `beginRestore`.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
>> >> >> >>> callbacks
>> >> >> >>>>>>> would
>> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
>> >> they
>> >> >> >>>> either
>> >> >> >>>>>>> let
>> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
>> >> >> >>> `restoreAll`
>> >> >> >>>>>>> only
>> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
>> >> impl
>> >> >> >>> of
>> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>> >> >> >>> endRestore as
>> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>> >> >> >>> abstract
>> >> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
>> >> >> >>> supported
>> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
>> >> >> >>>>> StateRestoreContext,
>> >> >> >>>>>>>> which is important for users to track the restoration
>> progress
>> >> >> >>> since
>> >> >> >>>>>>>> otherwise they could not tell how many percent of
>> restoration
>> >> has
>> >> >> >>>>>>>> completed.  I.e.:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
>> >> >> >>>>>>> StateRestoreCallback
>> >> >> >>>>>>>> {
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
>> >> records,
>> >> >> >>>>>>>> StateRestoreContext
>> >> >> >>>>>>>> restoreContext);
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>> >> >> >>>>>>>> }
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> public interface StateRestoreContext {
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  long lastOffsetRestored();
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  long endOffsetToRestore();
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  int numberRestored();
>> >> >> >>>>>>>> }
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> Guozhang
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
>> >> bbej...@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>> Guozhang, Matthias,
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA
>> title
>> >> >> and
>> >> >> >>>>>>>>> description as well).
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> I had thought about introducing a separate interface
>> >> altogether,
>> >> >> >>> but
>> >> >> >>>>>>>>> extending the current one makes more sense.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of
>> >> >> >>> records, I
>> >> >> >>>>>>> think
>> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
>> querying
>> >> >> for
>> >> >> >>>>>>>>> intermediate results, but it would be per batch restored.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> Thanks,
>> >> >> >>>>>>>>> Bill
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
>> >> j...@jagunet.com>
>> >> >> >>>>> wrote:
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> >> >> >>>>>>> matth...@confluent.io>
>> >> >> >>>>>>>>>> wrote:
>> >> >> >>>>>>>>>>>
>> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
>> change
>> >> >> the
>> >> >> >>>>>>>> current
>> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
>> current
>> >> >> >>> one.
>> >> >> >>>>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>> ++1
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> --
>> >> >> >>>>>>>> -- Guozhang
>> >> >> >>>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> --
>> >> >> >>> -- Guozhang
>> >> >> >>>
>> >> >> >>
>> >> >> >>
>> >> >> >
>> >> >>
>> >> >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to