More specifically, if we can replace the first parameter from the String
store name to the store instance itself, would that be sufficient to cover `
StateRestoreNotification`?

On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Bill,
>
> I'm wondering why we need the `StateRestoreNotification` while still
> having `StateRestoreListener`, could the above setup achievable just with
> `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
> later can subsume any use cases intended for the former API.
>
> Guozhang
>
> On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bbej...@gmail.com> wrote:
>
>> I'm going to update the KIP with new interface StateRestoreNotification
>> containing two methods, startRestore and endRestore.
>>
>> While naming is very similar to methods already proposed on the
>> StateRestoreListener, the intent of these methods is not for user
>> notification of restore status.  Instead these new methods are for
>> internal
>> use by the state store to perform any required setup and teardown work due
>> to a batch restoration process.
>>
>> Here's one current use case: when using RocksDB we should optimize for a
>> bulk load by setting Options.prepareForBulkload().
>>
>>    1. If the database has already been opened, we'll need to close it, set
>>    the "prepareForBulkload" and re-open the database.
>>    2. Once the restore is completed we'll need to close and re-open the
>>    database with the "prepareForBulkload" option turned off.
>>
>> While we are mentioning the RocksDB use case above, the addition of this
>> interface is not specific to any specific implementation of a persistent
>> state store.
>>
>> Additionally, a separate interface is needed so that any user can
>> implement
>> the state restore notification feature regardless of the state restore
>> callback used.
>>
>> I'll also remove the "getStateRestoreListener" method and stick with the
>> notion of a "global" restore listener for now.
>>
>> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bbej...@gmail.com> wrote:
>>
>> > Yes it is, more of an oversight on my part, I'll remove it from the KIP.
>> >
>> >
>> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I thinks for now it's good enough to start with a single global restore
>> >> listener. We can incrementally improve this later on if required. Of
>> >> course, if it's easy to do right away we can also be more fine grained.
>> >> But for KTable, we might want to add this after getting rid of all the
>> >> overloads we have atm.
>> >>
>> >> One question: what is the purpose of parameter "endOffset" in
>> >> #onRestoreEnd() -- isn't this the same value as provided in
>> >> #onRestoreStart() ?
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >>
>> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
>> >> > Thinking about the custom StateRestoreListener approach and having a
>> get
>> >> > method on the interface will really only work for custom state
>> stores.
>> >> >
>> >> > So we'll need to provide another way for users to set behavior with
>> >> > provided state stores.  The only option that comes to mind now is
>> also
>> >> > adding a parameter to the StateStoreSupplier.
>> >> >
>> >> >
>> >> > Bill
>> >> >
>> >> >
>> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbej...@gmail.com>
>> wrote:
>> >> >
>> >> >> Guozhang,
>> >> >>
>> >> >> Thanks for the comments.
>> >> >>
>> >> >> 1.  As for the granularity, I agree that having one global
>> >> >> StateRestoreListener could be restrictive.  But I think it's
>> important
>> >> to
>> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
>> users
>> >> to
>> >> >> define an anonymous instance that has access to local scope for
>> >> reporting
>> >> >> purposes.  This is a similar pattern we use for
>> >> >> KafkaStreams.setStateListener.
>> >> >>
>> >> >> As an alternative, what if we add a method to the
>> >> BatchingStateRestoreCallback
>> >> >> interface named "getStateStoreListener".   Then in an abstract
>> adapter
>> >> >> class we return null from getStateStoreListener.   But if users
>> want to
>> >> >> supply a different StateRestoreListener strategy per callback they
>> >> would
>> >> >> simply override the method to return an actual instance.
>> >> >>
>> >> >> WDYT?
>> >> >>
>> >> >> 2.  I'll make the required updates to pass in the ending offset at
>> the
>> >> >> start as well as the actual name of the state store.
>> >> >>
>> >> >> Bill
>> >> >>
>> >> >>
>> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >> >>
>> >> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
>> >> >>>
>> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may
>> >> not be
>> >> >>> sufficient, as in the listener callback we do not which store it is
>> >> >>> restoring right now: if the topic is a changelog topic then from
>> the
>> >> >>> `TopicPartition` we may be able to infer the state store name, but
>> if
>> >> the
>> >> >>> topic is the source topic read as a KTable then we may not know
>> which
>> >> >>> store
>> >> >>> it is restoring right now; plus forcing users to infer the state
>> store
>> >> >>> name
>> >> >>> from the topic partition name would not be intuitive as well. Plus
>> for
>> >> >>> different stores the listener may be implemented differently, and
>> >> setting
>> >> >>> a
>> >> >>> global listener would force users to branch on the topic-partition
>> >> names,
>> >> >>> similarly to what we did in the global timestamp extractor. On the
>> >> other
>> >> >>> hand, I also agree that setting the listener on the per-store
>> >> granularity
>> >> >>> may be a bit cumbersome since if users want to override it on a
>> >> specific
>> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So
>> >> would
>> >> >>> love to hear other people's opinions.
>> >> >>>
>> >> >>> If we think that different implemented restoring callback may be
>> less
>> >> >>> common, then I'd suggest at least replace the `TopicPartition`
>> >> parameter
>> >> >>> with the `String` store name and the `TaskId`?
>> >> >>>
>> >> >>> 2. I think we can pass in the `long endOffset` in the
>> `onRestoreStart`
>> >> >>> function as well, as we will have read the endOffset already by
>> then;
>> >> >>> otherwise users can still not be able to track the restoration
>> >> progress
>> >> >>> (e.g. how much percentage I have been restoring so far, to estimate
>> >> on how
>> >> >>> long I still need to wait).
>> >> >>>
>> >> >>>
>> >> >>> Guozhang
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbej...@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> Eno,
>> >> >>>>
>> >> >>>> Thanks for the comments.
>> >> >>>>
>> >> >>>> 1. As for having both restore and restoreAll, I kept the restore
>> >> method
>> >> >>> for
>> >> >>>> backward compatibility as that is what is used by current
>> >> implementing
>> >> >>>> classes. However as I think about it makes things cleaner to have
>> a
>> >> >>> single
>> >> >>>> restore method taking a collection. I'll wait for others to weigh
>> in,
>> >> >>> but
>> >> >>>> I'm leaning towards having a single restore method.
>> >> >>>>
>> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
>> restore
>> >> >>> process
>> >> >>>> as we load records from each poll request.
>> >> >>>>
>> >> >>>>    For example if the change log contained 5000 records and
>> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
>> would
>> >> get
>> >> >>>> called 5 times each time with the ending offset of the last
>> record in
>> >> >>> the
>> >> >>>> batch and the count    of the batch.   I'll update the KIP to add
>> >> >>> comments
>> >> >>>> above the interface methods.
>> >> >>>>
>> >> >>>>
>> >> >>>> Thanks,
>> >> >>>> Bill
>> >> >>>>
>> >> >>>>
>> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
>> >> eno.there...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Thanks Bill,
>> >> >>>>>
>> >> >>>>> A couple of questions:
>> >> >>>>>
>> >> >>>>>
>> >> >>>> 1. why do we need both restore and restoreAll, why can't we just
>> have
>> >> >>> one,
>> >> >>>>> that takes a collection (i.e., restore all)? Are there cases when
>> >> >>> people
>> >> >>>>> want to restore one at a time? In that case, they could still use
>> >> >>>>> restoreAll with just 1 record in the collection right?
>> >> >>>>>
>> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
>> >> comment
>> >> >>> on
>> >> >>>>> top of all three methods. An example might help here.
>> >> >>>>>
>> >> >>>>> Thanks
>> >> >>>>> Eno
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bbej...@gmail.com> wrote:
>> >> >>>>>>
>> >> >>>>>> Guozhang, Damian thanks for the comments.
>> >> >>>>>>
>> >> >>>>>> Giving developers the ability to hook into StateStore recovery
>> >> >>> phases
>> >> >>>> was
>> >> >>>>>> part of my original intent. However the state the KIP is in now
>> >> >>> won't
>> >> >>>>>> provide this functionality.
>> >> >>>>>>
>> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
>> I'll
>> >> >>> be
>> >> >>>>> sure
>> >> >>>>>> to incorporate all your comments in the new revision.
>> >> >>>>>>
>> >> >>>>>> Thanks,
>> >> >>>>>> Bill
>> >> >>>>>>
>> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
>> damian....@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>>
>> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
>> i.e.,
>> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also
>> need
>> >> >>> to
>> >> >>>>> have
>> >> >>>>>>> the end offset available such that people can use it derive
>> >> >>> progress.
>> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
>> could
>> >> >>> be:
>> >> >>>>>>>
>> >> >>>>>>> long beginOffset()
>> >> >>>>>>> long endOffset()
>> >> >>>>>>> long currentOffset()
>> >> >>>>>>>
>> >> >>>>>>> One further thing, this currently doesn't provide developers
>> the
>> >> >>>>> ability to
>> >> >>>>>>> hook into this information if they are using the built-in
>> >> >>> StateStores.
>> >> >>>>> Is
>> >> >>>>>>> this something we should be considering?
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wangg...@gmail.com>
>> >> >>> wrote:
>> >> >>>>>>>
>> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
>> >> >>>>>>>>
>> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once
>> per
>> >> >>>> store
>> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
>> >> called
>> >> >>>> per
>> >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext
>> as
>> >> a
>> >> >>>>> second
>> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the
>> >> >>>> library
>> >> >>>>> to
>> >> >>>>>>>> set the corresponding values instead and only let users to
>> read
>> >> >>>> (since
>> >> >>>>>>> the
>> >> >>>>>>>> collection of key-value pairs do not contain offset
>> information
>> >> >>>> anyways
>> >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
>> >> >>> would
>> >> >>>> be
>> >> >>>>>>> the
>> >> >>>>>>>> starting offset when called on `beginRestore`.
>> >> >>>>>>>>
>> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
>> >> >>> callbacks
>> >> >>>>>>> would
>> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
>> they
>> >> >>>> either
>> >> >>>>>>> let
>> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
>> >> >>> `restoreAll`
>> >> >>>>>>> only
>> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
>> impl
>> >> >>> of
>> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>> >> >>> endRestore as
>> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>> >> >>> abstract
>> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
>> >> >>> supported
>> >> >>>>>>>> exception" and keep `restoreAll` abstract.
>> >> >>>>>>>>
>> >> >>>>>>>> 3) I think we can also return the "offset limit" in
>> >> >>>>> StateRestoreContext,
>> >> >>>>>>>> which is important for users to track the restoration progress
>> >> >>> since
>> >> >>>>>>>> otherwise they could not tell how many percent of restoration
>> has
>> >> >>>>>>>> completed.  I.e.:
>> >> >>>>>>>>
>> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
>> >> >>>>>>> StateRestoreCallback
>> >> >>>>>>>> {
>> >> >>>>>>>>
>> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
>> records,
>> >> >>>>>>>> StateRestoreContext
>> >> >>>>>>>> restoreContext);
>> >> >>>>>>>>
>> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>> >> >>>>>>>>
>> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>> >> >>>>>>>> }
>> >> >>>>>>>>
>> >> >>>>>>>> public interface StateRestoreContext {
>> >> >>>>>>>>
>> >> >>>>>>>>  long lastOffsetRestored();
>> >> >>>>>>>>
>> >> >>>>>>>>  long endOffsetToRestore();
>> >> >>>>>>>>
>> >> >>>>>>>>  int numberRestored();
>> >> >>>>>>>> }
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> Guozhang
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
>> bbej...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>>>>
>> >> >>>>>>>>> Guozhang, Matthias,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title
>> >> and
>> >> >>>>>>>>> description as well).
>> >> >>>>>>>>>
>> >> >>>>>>>>> I had thought about introducing a separate interface
>> altogether,
>> >> >>> but
>> >> >>>>>>>>> extending the current one makes more sense.
>> >> >>>>>>>>>
>> >> >>>>>>>>> As for intermediate callbacks based on time or number of
>> >> >>> records, I
>> >> >>>>>>> think
>> >> >>>>>>>>> the latest update to the KIP addresses this point of querying
>> >> for
>> >> >>>>>>>>> intermediate results, but it would be per batch restored.
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>> Bill
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
>> j...@jagunet.com>
>> >> >>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> >> >>>>>>> matth...@confluent.io>
>> >> >>>>>>>>>> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> With regard to backward compatibility, we should not change
>> >> the
>> >> >>>>>>>> current
>> >> >>>>>>>>>>> interface, but add a new interface that extends the current
>> >> >>> one.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> ++1
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> --
>> >> >>>>>>>> -- Guozhang
>> >> >>>>>>>>
>> >> >>>>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> -- Guozhang
>> >> >>>
>> >> >>
>> >> >>
>> >> >
>> >>
>> >>
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to