Thanks for the clarification, it looks good to me now. On Wed, Nov 17, 2021 at 9:21 PM John Roesler <vvcep...@apache.org> wrote:
> Ah, sorry, Guozhang, > > It seem I was a bit too eager with starting the vote thread. > > 13: I think that makes perfect sense. I've updated the KIP. > > 14: Oof, I can't believe I overlooked those newer > exceptions. Some of them will become exceptions in IQv2, > whereas others will beceome individual partition QueryResult > failures. Here is an accounting of what will become of those > proposed exceptions: > > * StreamsNotStartedException: thrown when stream thread > state is CREATED, the user can retry until to RUNNING. > > * StreamsRebalancingException: thrown when stream thread is > not running and stream state is REBALANCING. This exception > is no longer applicable. Regardless of the rebalanceing > state of the store's task, the state will either be up to > the requested bound or not. > > * StateStoreMigratedException: thrown when state store > already closed and stream state is RUNNING. This is a per- > partition failure, so it now maps to the > FailureReason.NOT_PRESENT failure. > > > * StateStoreNotAvailableException: thrown when state store > closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING / > ERROR. I (subjectively) felt the name was ambiguous with > respect to the prior condition in which a store partition is > not locally available. This is replaced with the thrown > exception, StreamsStoppedException (the JavaDoc states the > that it is thrown when Streams is in any terminal state). > > * UnknownStateStoreException: thrown when passing an unknown > state store. This is still a thown exception. > > * InvalidStateStorePartitionException: thrown when user > requested partition is not available on the stream instance. > If the partition actually does exist, then we will now > return a per-partition FailureReason.NOT_PRESENT. If the > requested partition is actually not present in the topology > at all, then we will return the per-partition > FailureReason.DOES_NOT_EXIST. > > Sorry for the oversight. The KIP has been updated. > > Thanks, > -John > > On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote: > > Thanks John. > > > > I made another pass on the KIP and overall it already looks pretty good. > I > > just have a couple more minor comments: > > > > 13: What do you think about just removing the following function in > > QueryResult > > > > // returns a failed query result because caller requested a "latest" > > bound, but the task was > > // not active and running. > > public static <R> QueryResult<R> notActive(String currentState); > > > > Instead just use `notUpToBound` for the case when `latest` bound is > > requested but the node is not the active replica. My main motivation is > > trying to abstract away the notion of active/standby from the public APIs > > itself, and hence capturing both this case as well as just a > > normal "position bound not achieved" in the same return signal, until > later > > when we think it is indeed needed to separate them with different > returns. > > > > 14: Regarding the possible exceptions being thrown from `query`, it seems > > more exception types are possible from KIP-216: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors > , > > should we include all in the javadocs? > > > > > > Guozhang > > > > > > > > On Wed, Nov 17, 2021 at 3:25 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks for the reply, Guozhang! > > > > > > I have updated the KIP to tie up the remaining points that > > > we have discussed. I really appreciate your time in refining > > > the proposal. I included a quick summary of the final state > > > of our discussion points below. > > > > > > Since it seems like this discussion thread is pretty > > > convergent, I'll go ahead and start the voting thread soon. > > > > > > Thanks again! > > > -John > > > > > > P.S.: the final state of our discussion points: > > > > > > 1. I removed serdesForStore from the proposal (and moved it > > > to Rejected Alternatives) > > > > > > 2. Thanks for that reference. I had overlooked that > > > implementation. I'd note that the ListValuesStore is > > > currently only used in the KStream API, which doesn't > > > support queries at all. Due to its interface, it could > > > theoretically be used to materialize a KTable, though it has > > > no supplier provided in the typical Stores factory class. > > > > > > Regardless, I think that it would still be a similar story > > > to the Segmented store. The ListValues store would simply > > > choose to terminate the query on its own and not delegate to > > > any of the wrapped KeyValue stores. It wouldn't matter that > > > the wrapped stores have a query-handling facility of their > > > own, if the wrapping store doesn't choose to delegate, the > > > wrapped store will not try to execute any queries. > > > > > > Specifically regarding the key transformation that these > > > "formatted" stores perform, when they handle the query, they > > > would have the ability to execute the query in any way that > > > makes sense OR to just reject the query if it doesn't make > > > sense. > > > > > > 3, 4: nothing to do > > > > > > 5: I updated the KIP to specify the exceptions that may be > > > thrown in `KafkaStreams#query` and to clarify that per- > > > partition failures will be reported as per-partition failed > > > QueryResult objects instead of thrown exceptions. That > > > allows us to successfully serve some partitions of the > > > request even if others fail. > > > > > > 6: I added a note that updating the metadata APIs is left > > > for future work. > > > > > > 7: nothing to do > > > > > > 8. I went with StateQueryRequest and StateQueryResponse. > > > > > > 9, 10: nothing to do. > > > > > > 11: Ah, I see. That's a good point, but it's not fundamental > > > to the framework. I think we can tackle it when we propose > > > the actual queries. > > > > > > 12: Cool. I went ahead and dropped the "serdesForStore" > > > method. I think you're onto something there, and we should > > > tackle it separately when we propose the actual queries. > > > > > > > > > > > > > > > On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote: > > > > Thanks John! Some more thoughts inlined below. > > > > > > > > On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > Thanks for the review, Guozhang! > > > > > > > > > > 1. This is a great point. I fell into the age-old trap of > > > > > only considering the simplest store type and forgot about > > > > > this extra wrinkle of the "key schema" that we use in > > > > > Windowed and Session stores. > > > > > > > > > > Depending on how we want to forge forward with our provided > > > > > queries, I think it can still work out ok. The simplest > > > > > solution is just to have windowed versions of our queries > > > > > for use on the windowed stores. That should work naively > > > > > because we're basically just preserving the existing > > > > > interactions. It might not be ideal in the long run, but at > > > > > least it lets us make IQv2 orthogonal from other efforts to > > > > > simplify the stores themselves. > > > > > > > > > > If we do that, then it would actually be correct to go ahead > > > > > and just return the serdes that are present in the Metered > > > > > stores today. For example, if I have a Windowed store with > > > > > Integer keys, then the key serde I get from serdesForStore > > > > > would just be the IntegerSerde. The query I'd use the > > > > > serialized key with would be a RawWindowedKeyQuery, which > > > > > takes a byte[] key and a timestamp. Then, the low-level > > > > > store (the segmented store in this case) would have to take > > > > > the next step to use its schema before making that last-mile > > > > > query. Note, this is precisely how fetch is implemented > > > > > today in RocksDBWindowStore: > > > > > > > > > > public byte[] fetch(final Bytes key, final long timestamp) { > > > > > return wrapped().get(WindowKeySchema.toStoreKeyBinary(key, > > > > > timestamp, seqnum)); > > > > > } > > > > > > > > > > In other words, if we set up our provided Query types to > > > > > stick close to the current store query methods, then > > > > > everything "should work out" (tm). > > > > > > > > > > I think where things start to get more complicated would be > > > > > if we wanted to expose the actual, raw, on-disk binary key > > > > > to the user, along with a serde that can interpret it. Then, > > > > > we would have to pack up the serde and the schema. If we go > > > > > down that road, then knowing which one (the key serde or the > > > > > windowed schema + the key serde) the user wants when they > > > > > ask for "the serde" would be a challenge. > > > > > > > > > > I'm actually thinking maybe we don't need to include the > > > > > serdesForStore method in this proposal, but instead leave it > > > > > for follow-on work (if desired) to add it along with raw > > > > > queries, since it's really only needed if you want raw > > > > > queries and (as you mentioned later) there may be better > > > > > ways to present the serdes, which is always easier to figure > > > > > out once there's a use case. > > > > > > > > > > > > > > > 2. Hmm, if I understand what you mean by the "formatted" > > > > > layer, is that the one supplied by the > > > > > WindowedBytesStoreSupplier or SessionBytesStoreSupplier in > > > > > Materialized? I think the basic idea of this proposal is to > > > > > let whatever store gets supplied there be the "last stop" > > > > > for the query. > > > > > > > > > > For the case of our default windowed store, this is the > > > > > segmented RocksDB store. It's true that this store "wraps" a > > > > > bunch of segments, but it would be the segmented store's > > > > > responsibility to handle the query, not defer to the > > > > > segments. This might mean different things for different > > > > > queries, but naively, I think it can just invoke to the > > > > > current implementation of each of its methods. > > > > > > > > > > There might be future queries that require more > > > > > sophisticated responses, but we should be able to add new > > > > > queries for those, which have no restrictions on their > > > > > response types. For example, we could choose to respond to a > > > > > scan with a list of iterators, one for each segment. > > > > > > > > > > > > > > For `formatted` stores, I also mean the ListValueStore which was > added > > > > recently for stream-stream joins, for example. The interface is a > > > KV-store > > > > but that disables same-key overwrites but instead keep all the > values of > > > > the same key as a list, and users can only delete old values by > deleting > > > > the whole key-list (which would of course delete new values as well). > > > > ListValueStore uses a KeyValueStore as its inner, but would convert > the > > > put > > > > call as append. > > > > > > > > I think in the long run, we should have a different interface other > than > > > > KVStore for this type, and the implementation would then be at the > > > > `formatted` store layer. That means the `query` should be always > > > > implemented at the inner layer of the logged store (that could be the > > > most > > > > `inner` store, or the `fomatted` store), and upper level wrapped > stores > > > > would be delegating to the inner stores. > > > > > > > > As for serdes, here's some more second thoughts: generally speaking, > it's > > > > always convenient for users to pass in the value as object than raw > > > bytes, > > > > the only exception is if the query is not for exact matching but > prefix > > > (or > > > > suffix, though we do not have such cases today) matching, in which > case > > > we > > > > would need the raw bytes in order to pass in the prefixed bytes into > the > > > > inner store. The returned value though could either be preferred as > raw > > > > bytes, or be deserialized already. > > > > > > > > The composite-serde mostly happens at the key, but not much at the > value > > > > (we only have "value-timestamp" type which needs a composite > > > > deserialization, all others are direct values). So I'm feeling that a > > > Query > > > > would be best represented with non-serialized parameter (i.e. > > > `KeyQuery<K, > > > > V>`), while the query result be optionally raw or deserialized with > the > > > > serde class. > > > > > > > > > > > > > > > > > > 3. I agree the large switch (or if/else) (or Map) for query > > > > > dispatch is a concern. That's the thing I'm most worried > > > > > will become cumbersome. I think your idea is neat, though, > > > > > because a lot of our surface area is providing a bunch of > > > > > those different combinations of query attributes. I think if > > > > > we get a little meta, we can actually fold it into the > > > > > existing KIP. > > > > > > > > > > Rather than making Query any more restrictive, what we could > > > > > do is to choose to follow your idea for the provided queries > > > > > we ship with Streams. Although I had been thinking we would > > > > > ship a KeyQuery, RangeQuery, etc., we could absolutely > > > > > compactify those queries as much as possible so that there > > > > > are only a few queries with those dimensions you listed. > > > > > > > > > > That way we can avoid blowing up the query space with our > > > > > own provided queries, but we can still keep the framework as > > > > > general as possible. > > > > > > > > > > > > > > Sounds good! > > > > > > > > > > > > > 4. I'm not sure, actually! I just thought it would be neat > > > > > to have. I know I've spent my fair share of adding println > > > > > statements to Streams or stepping though the debugger when > > > > > something like that proposal would have done the job. > > > > > > > > > > So, I guess the answer is yes, I was just thinking of it as > > > > > a debugging/informational tool. I also think that if we want > > > > > to make it more structured in the future, we should be able > > > > > to evolve that part of the API without and major problems. > > > > > > > > > > > > > > > 5. That's another great point, and it's a miss on my part. > > > > > The short answer is that we'd simply throw whatever runtime > > > > > exceptions are appropriate, but I should and will document > > > > > what they will be. > > > > > > > > > > > > > > > 6. I do think those APIs need some attention, but I was > > > > > actually hoping to treat that as a separate scope for design > > > > > work later. I think that there shouldn't be any downside to > > > > > tackling them as orthogonal, but I agree people will wonder > > > > > about the relationship there, so I can update the KIP with > > > > > some notes about it. > > > > > > > > > > > > > > Thanks! I personally would consider that these APIs would eventually > be > > > > refactored as well as we stick with IQv2, and also the > > > > `allLocalStorePartitionLags` would be deprecated with Position. > > > > > > > > > > > > > > > > > > 7. Yes, I've always been a bit on the fence about whether to > > > > > bundle that in here. The only thing that made me keep it in > > > > > is that we'd actually have to deprecate the newly proposed > > > > > StateStore#query method if we want to add it in later. I.e., > > > > > we would just propose StateStore#query(query, executionInfo) > > > > > right now, but then deprecate it and add > > > > > StateStore#query(query, bound, executionInfo). > > > > > > > > > > Given that, it seems mildly better to just take the leap for > > > > > now, and if it turns out we can't actually implement it > > > > > nicely, then we can always drop it from the proposal after > > > > > the fact. > > > > > > > > > > That said, if that aspect is going to derail this KIP's > > > > > discussion, I think the lesser evil would indeed be to just > > > > > drop it now. So far, it seems like there's been some small > > > > > questions about it, but nothing that really takes us off > > > > > course. So, if you don't object, I think I'd like to keep it > > > > > in for a little while longer. > > > > > > > > > > > > > > That's a fair point, let's keep it in this KIP then. > > > > > > > > > > > > > > > > > > 8. Sure, I like that idea. The names are a bit cumbersome. > > > > > > > > > > 9. I had them as separate types so that we could more easily > > > > > inspect the query type. Otherwise, we'd just have to assume > > > > > the generics' type is byte[] in the lower layer. I'm not > > > > > sure that's the right call, but it also seems like the flip > > > > > of a coin as to which is better. > > > > > > > > > > 10. The StateSerdes class that we have is internal. I used > > > > > it in the POC to save time, but I gave it a different name > > > > > in the KIP to make it clear that I'm proposing that we > > > > > create a proper public interface and not just expose the > > > > > internal one, which has a bunch of extra stuff in it. > > > > > > > > > > Then again, if I go ahead and drop the serdes from the > > > > > propsoal entirely, we can worry about that another time! > > > > > > > > > > > > > > > 11. I think I might have a typo somewhere, because I'm not > > > > > following the question. The Query itself defines the result > > > > > type <R>, QueryResult is just a container wrapping that R > > > > > result as well as the execution info, etc. per partition. > > > > > > > > > > For a KeyQuery, its signature is: > > > > > KeyQuery<K, V> implements Query<V> > > > > > > > > > > So, when you use that query, it does bind R to V, and the > > > > > result will be a QueryResult<V>. > > > > > > > > > > > > > > Cool thanks. My main confusion comes from the inconsistency of > key-query > > > > and scan-query. The former implements Query as: > > > > > > > > KeyQuery<K, V> implements Query<V>: => binds V to R, and K unbound > > > > > > > > Whereas the latter implements as: > > > > > > > > ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds > > > > KeyValueIterator<?, ?> to R, whereas K/V both unbound > > > > > > > > > > > > > > > > > > > > > > 12. I considered doing exactly that. The reason I shied away > > > > > from it in general is that if you're going to have a "raw" > > > > > query API, you also need to know the key serde before you do > > > > > a query (otherwise you can't query at all!). So, bundling a > > > > > serde with the response only really applies to the value. > > > > > > > > > > > > > > See the other comment above: my thinking is actually that, for Query > we > > > > would, potentially always, prefer to have it as in deserialized > object > > > > format (except for partial match, which we can discuss separately), > we > > > only > > > > need to consider whether the QueryResult should be in raw or in > > > > deserialized format. > > > > > > > > > > > > > It still might be a good idea, but since I was thinking I > > > > > already needed a separate discovery method for the key > > > > > serde, then I might as well just keep the key and value > > > > > serdes together, rather than bundling the value serde with > > > > > each value. > > > > > > > > > > I do think it would be neat to have queries that don't > > > > > deserialize the value by default and give you the option to > > > > > do it on demand, or maybe just de-structure some parts of > > > > > the value out (eg just reading the timestamp without > > > > > deserializing the rest of the value). But, now that I've > > > > > started to think about dropping the "raw" query design from > > > > > the scope of this KIP, I'm wondering if we can just consider > > > > > this use case later. It does seem plausible that we could > > > > > choose to bundle the serdes with the values for those > > > > > queries without needing a change in this KIP's framework, at > > > > > least. > > > > > > > > > > > > > > > Whew! Thanks again for the great thoughts. I'll make the > > > > > changes I mentioned tomorrow. Please let me know if you > > > > > disagree with any of my responses! > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote: > > > > > > Hello John, > > > > > > > > > > > > Great, great, great writeup! :) And thank you for bringing this > up > > > > > finally. > > > > > > I have made a pass on the KIP as well as the POC PR of it, here > are > > > some > > > > > > initial thoughts: > > > > > > > > > > > > First are some meta ones: > > > > > > > > > > > > 1. Today the serdes do not only happen at the metered-store > layer, > > > > > > unfortunately. For windowed / sessioned stores, and also some > newly > > > added > > > > > > ones for stream-stream joins that are optimized for time-based > range > > > > > > queries, for example, the serdes are actually composite at > multiple > > > > > layers. > > > > > > And the queries on the outer interface are also translated with > serde > > > > > > wrapped / stripped along the way in layers. To be more specific, > > > today > > > > > our > > > > > > store hierarchy is like this: > > > > > > > > > > > > metered * -> cached -> logged * -> formatted * (e.g. segmenged, > > > > > > list-valued) -> inner (rocksdb, in-memory) > > > > > > > > > > > > and serdes today could happen on the layers with * above, where > each > > > > > layer > > > > > > is stuffing a bit more as prefix/suffix into the query bytes. > This > > > is not > > > > > > really by design or ideal, but a result of history accumulated > tech > > > > > debts.. > > > > > > There's a related JIRA ticket for it: > > > > > > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my > point > > > is > > > > > that > > > > > > we need to be a bit careful regarding how to implement the > > > > > > `KafkaStreams#serdesForStore(storeName)`, as we may expect some > bumpy > > > > > roads > > > > > > moving forward. > > > > > > > > > > > > 2. Related to 1 above, I think we cannot always delegate the > > > `query()` > > > > > > implementation to the `inner` store layer, since some serde, or > even > > > some > > > > > > computation logic happens at the outer, especially the > `formatted` > > > layer. > > > > > > For example, besides the cached layer, the `formatted` layer also > > > needs > > > > > to > > > > > > make sure the `query` object is being appropriately translated > > > > > beforeMaterialized > > > > > > handing it off downstreams to the inner store, and also need to > > > translate > > > > > > the `queryResult` a bit while handing it upwards in the > hierarchy. > > > > > > > > > > > > 3. As we add more query types in the IQv2, the inner store's > `query` > > > > > > instantiation may be getting very clumsy with a large "switch" > > > condition > > > > > on > > > > > > all the possible query types. Although custom stores could > consider > > > only > > > > > > supporting a few, having the `default` case to ignore all others, > > > > > built-in > > > > > > stores may still need to exhaust all possible types. I'm > wondering if > > > > > it's > > > > > > a good trade-off to make `Query` be more restricted on > extensibility > > > to > > > > > > have less exploding query type space, e.g. if a Query can only be > > > > > extended > > > > > > with some predefined dimensions like: > > > > > > > > > > > > * query-field: key, non-key (some field extractor from the value > > > bytes > > > > > need > > > > > > to be provided) > > > > > > * query-scope: single, range > > > > > > * query-match-type (only be useful for a range scope): > prefix-match > > > (e.g. > > > > > > for a range key query, the provided is only a prefix, and all > keys > > > > > > containing this prefix should be returned), full-match > > > > > > * query-value-type: object, raw-bytes > > > > > > > > > > > > 4. What's the expected usage for the execution info? Is it only > for > > > > > logging > > > > > > purposes? If yes then I think not enforcing any string format is > > > fine, > > > > > that > > > > > > the store layers can just attach any string information that they > > > feel > > > > > > useful. > > > > > > > > > > > > 5. I do not find any specific proposals for exception handling, > what > > > > > would > > > > > > that look like? E.g. besides all the expected error cases like > > > > > non-active, > > > > > > how would we communicate other unexpected error cases such as > store > > > > > closed, > > > > > > IO error, bad query parameters, etc? > > > > > > > > > > > > 6. Since we do not deprecate any existing APIs in this KIP, it's > a > > > bit > > > > > hard > > > > > > for readers to understand what is eventually going to be covered > by > > > IQv2. > > > > > > For example, we know that eventually `KafkaStreams#store` would > be > > > gone, > > > > > > but what about `KafkaStreams#queryMetadataForKey`, and > > > > > > `#streamsMetadataForStore`, and also > `allLocalStorePartitionLags`? I > > > > > think > > > > > > it would be great to mention the end world state with IQv2 even > if > > > the > > > > > KIP > > > > > > itself would not deprecate anything yet. > > > > > > > > > > > > 7. It seems people are still a bit confused about the > > > > > > "Position/PositionBound" topics, and personally I think it's > okay to > > > > > > exclude them in this KIP just to keep its (already large) scope > > > smaller. > > > > > > Also after we started implementing the KIP in full, we may have > > > learned > > > > > new > > > > > > things while fighting the details in the weeds, and that would > be a > > > > > better > > > > > > timing for us to consider new parameters such as bounds, but also > > > caching > > > > > > bypassing, and other potential features as well. > > > > > > > > > > > > Some minor ones: > > > > > > > > > > > > 8. What about just naming the new classes as > > > `StateQueryRequest/Result`, > > > > > or > > > > > > `StoreQueryRequest/Result`? The word "interactive" is for > describing > > > its > > > > > > semantics in docs, but I feel for class names we can use a more > > > > > meaningful > > > > > > prefix. > > > > > > > > > > > > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or > > > directly > > > > > > implementing `Query<byte[]`>? > > > > > > > > > > > > 10. Why do we need the new class "InteractiveQuerySerdes" along > with > > > > > > existing classes? In your PR it seems just using `StateSerdes` > > > directly. > > > > > > > > > > > > 11. Why do we have a new template type "R" in the QueryResult > class > > > in > > > > > > addition to "<K, V>"? Should R always be equal to V? > > > > > > > > > > > > 12. Related to 10/11 above, what about letting the QueryResult to > > > always > > > > > be > > > > > > returning values in raw bytes, along with the serdes? And then > it's > > > up to > > > > > > the callers whether they want the bytes to be immediately > > > deserialized or > > > > > > want them to be written somewhere and deserialized later? More > > > > > specifically > > > > > > we would only have a single function as KafkaStreams#query, and > the > > > > > > QueryResult would be: > > > > > > > > > > > > InteractiveQueryResult { > > > > > > public InteractiveQueryResult(Map<Integer /*partition*/, > > > > > > QueryResult<byte[]>> partitionResults); > > > > > > > > > > > > ... > > > > > > > > > > > > public StateSerdes<K, V> serdes(); > > > > > > } > > > > > > > > > > > > And then the result itself can also provide some built-in > functions > > > to do > > > > > > the deser upon returning results, so that user's code would not > get > > > more > > > > > > complicated. The benefit is that we end up with a single > function in > > > > > > `KafkaStreams`, and the inner store always only need to implement > > > the raw > > > > > > query types. Of course doing this would not be so easy given the > fact > > > > > > described in 1) above, but I feel this would be a good way to > first > > > > > > abstract away this tech debt, and then later resolve it to a > single > > > > > place. > > > > > > > > > > > > --------------- > > > > > > > > > > > > Again, congrats on the very nice proposal! Let me know what you > think > > > > > about > > > > > > my comments. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, Nov 15, 2021 at 2:52 PM John Roesler < > vvcep...@apache.org> > > > > > wrote: > > > > > > > > > > > > > Hi Patrick and Sagar, > > > > > > > > > > > > > > Thanks for the feedback! I'll just break out the questions > > > > > > > and address them one at a time. > > > > > > > > > > > > > > Patrick 1. > > > > > > > The default bound that I'm proposing is only to let active > > > > > > > tasks answer queries (which is also the default with IQ > > > > > > > today). Therefore, calling getPositionBound() would return a > > > > > > > PositionBound for which isLatest() is true. > > > > > > > > > > > > > > Patrick 2. > > > > > > > I might have missed something in revision, but I'm not sure > > > > > > > what you're referring to exactly when you say they are > > > > > > > different. The IQRequest only has a PositionBound, and the > > > > > > > IQResponse only has a (concrete) Position, so I think they > > > > > > > are named accordingly (getPositionBound and getPosition). Am > > > > > > > I overlooking what you are talking about? > > > > > > > > > > > > > > Sagar 1. > > > > > > > I think you're talking about the KeyValueStore#get(key) > > > > > > > method? This is a really good question. I went ahead and > > > > > > > dropped in an addendum to the KeyQuery example to show how > > > > > > > you would run the query in today's API. Here's a caracature > > > > > > > of the two APIS: > > > > > > > > > > > > > > current: > > > > > > > KeyValueStore store = kafkaStreams.store( > > > > > > > "mystore", > > > > > > > keyValueStore()) > > > > > > > int value = store.get(key); > > > > > > > > > > > > > > proposed: > > > > > > > int value = kafkaStreams.query( > > > > > > > "mystore", > > > > > > > KeyQuery.withKey(key)); > > > > > > > > > > > > > > So, today we first get the store interface and then we > > > > > > > invoke the method, and under the proposal, we would instead > > > > > > > just ask KafkaStreams to execute the query on the store. In > > > > > > > addition to all the other stuff I said in the motivation, > > > > > > > one thing I think is neat about this API is that it means we > > > > > > > can re-use queries across stores. So, for example, we could > > > > > > > also use KeyQuery on WindowStores, even though there's no > > > > > > > common interface between WindowStore and KeyValueStore. > > > > > > > > > > > > > > In other words, stores can support any queries that make > > > > > > > sense and _not_ support any queries that don't make sense. > > > > > > > This gets into your second question... > > > > > > > > > > > > > > Sagar 2. > > > > > > > Very good question. Your experience with your KIP-614 > > > > > > > contribution was one of the things that made me want to > > > > > > > revise IQ to begin with. It seems like there's a really > > > > > > > stark gap between how straightforward the proposal is to add > > > > > > > a new store operation, and then how hard it is to actually > > > > > > > implement a new operation, due to all those intervening > > > > > > > wrappers. > > > > > > > > > > > > > > There are two categories of wrappers to worry about: > > > > > > > - Facades: These only exist to disallow access to write > > > > > > > APIs, which are exposed through IQ today but shouldn't be > > > > > > > called. These are simply unnecessary under IQv2, since we > > > > > > > only run queries instead of returning the whole store. > > > > > > > - Store Layers: This is what you provided examples of. We > > > > > > > have store layers that let us compose features like > > > > > > > de/serialization and metering, changelogging, caching, etc. > > > > > > > A nice thing about this design is that we mostly don't have > > > > > > > to worry at all about those wrapper layers at all. Each of > > > > > > > these stores would simply delegate any query to lower layers > > > > > > > unless there is something they need to do. In my POC, I > > > > > > > simply added a delegating implementation to > > > > > > > WrappedStateStore, which meant that I didn't need to touch > > > > > > > most of the wrappers when I added a new query. > > > > > > > > > > > > > > Here's what I think future contributors will have to worry > > > > > > > about: > > > > > > > 1. The basic query execution in the base byte stores > > > > > > > (RocksDB and InMemory) > > > > > > > 2. The Caching stores IF they want the query to be served > > > > > > > from the cache > > > > > > > 3. The Metered stores IF some serialization needs to be done > > > > > > > for the query > > > > > > > > > > > > > > And that's it! We should be able to add new queries without > > > > > > > touching any other store layer besides those, and each one > > > > > > > of those is involved because it has some specific reason to > > > > > > > be. > > > > > > > > > > > > > > > > > > > > > Thanks again, Patrick and Sagar! Please let me know if I > > > > > > > failed to address your questions, or if you have any more. > > > > > > > > > > > > > > Thanks, > > > > > > > -John > > > > > > > > > > > > > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote: > > > > > > > > Hi John, > > > > > > > > > > > > > > > > Thanks for the great writeup! Couple of things I wanted to > bring > > > > > up(may > > > > > > > or > > > > > > > > mayn't be relevant): > > > > > > > > > > > > > > > > 1) The sample implementation that you have presented for > > > KeyQuery is > > > > > very > > > > > > > > helpful. One thing which may be added to it is how it > connects > > > to the > > > > > > > > KeyValue.get(key) method. That's something that atleast I > > > couldn't > > > > > > > totally > > > > > > > > figure out-not sure about others though. I understand that > it is > > > out > > > > > of > > > > > > > > scope of th KIP to explain for every query that IQ supports > but > > > one > > > > > > > > implementation just to get a sense of how the changes would > feel > > > > > like. > > > > > > > > 2) The other thing that I wanted to know is that StateStore > on > > > it's > > > > > own > > > > > > > has > > > > > > > > a lot of implementations and some of them are wrappers, So at > > > what > > > > > levels > > > > > > > > do users need to implement the query methods? Like a > > > > > MeteredKeyValueStore > > > > > > > > wraps RocksDbStore and calls it internally through a wrapped > > > call. > > > > > As per > > > > > > > > the new changes, how would the scheme of things look like > for the > > > > > same > > > > > > > > KeyQuery? > > > > > > > > > > > > > > > > Thanks! > > > > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi > > > > > > > <pstu...@confluent.io.invalid> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > Thanks for submitting the KIP! One question I have is, > > > assuming one > > > > > > > > > instantiates InteractiveQueryRequest via withQuery, and > then > > > later > > > > > > > calls > > > > > > > > > getPositionBound, what will the result be? Also I noticed > the > > > > > Position > > > > > > > > > returning method is in InteractiveQueryRequest and > > > > > > > InteractiveQueryResult > > > > > > > > > is named differently, any particular reason? > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Patrick > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler < > > > vvcep...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for taking a look, Sophie! > > > > > > > > > > > > > > > > > > > > Ah, that was a revision error. I had initially been > planning > > > > > > > > > > an Optional<Set<Integer>> with Optional.empty() meaning > to > > > > > > > > > > fetch all partitions, but then decided it was needlessly > > > > > > > > > > complex and changed it to the current proposal with two > > > > > > > > > > methods: > > > > > > > > > > > > > > > > > > > > boolean isAllPartitions(); > > > > > > > > > > Set<Integer> getPartitions(); (which would throw an > > > > > > > > > > exception if it's an "all partitions" request). > > > > > > > > > > > > > > > > > > > > I've corrected the javadoc and also documented the > > > > > > > > > > exception. > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman > > > > > > > > > > wrote: > > > > > > > > > > > Thanks John, I've been looking forward to this for a > while > > > > > now. It > > > > > > > > > > > was pretty horrifying to learn > > > > > > > > > > > how present-day IQ works (or rather, doesn't work) > with > > > custom > > > > > > > state > > > > > > > > > > > stores :/ > > > > > > > > > > > > > > > > > > > > > > One minor cosmetic point, In the > InteractiveQueryRequest > > > class, > > > > > > > the # > > > > > > > > > > > getPartitions > > > > > > > > > > > method has a return type of Set<Integer>, but the > javadocs > > > > > refer to > > > > > > > > > > Optional. > > > > > > > > > > > Not > > > > > > > > > > > sure which is intended for this API, but if is > supposed to > > > be > > > > > the > > > > > > > > > return > > > > > > > > > > > type, do you perhaps > > > > > > > > > > > mean for it to be Optional.ofEmpty() and > > > Optional.of(non-empty > > > > > > > set) > > > > > > > > > > > rather than Optional.of(empty set) and > > > Optional.of(non-empty > > > > > set) ? > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler < > > > > > vvcep...@apache.org > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hello again, all, > > > > > > > > > > > > > > > > > > > > > > > > Just bumping this discussion on a new, more flexible > > > > > > > > > > > > Interactive Query API in Kafka Streams. > > > > > > > > > > > > > > > > > > > > > > > > If there are no concerns, I'll go ahead and call a > vote > > > on > > > > > > > > > > > > Monday. > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler > wrote: > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start the discussion for KIP-796, which > > > > > proposes > > > > > > > > > > > > > a revamp of the Interactive Query APIs in Kafka > > > Streams. > > > > > > > > > > > > > > > > > > > > > > > > > > The proposal is here: > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw > > > > > > > > > > > > > > > > > > > > > > > > > > I look forward to your feedback! > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you, > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang