Hi Michael, comments inline: > On 11 Apr 2017, at 03:25, Michael Noll <mich...@confluent.io> wrote: > > Thanks for the updates, Eno! > > In addition to what has already been said: We should also explicitly > mention that this KIP is not touching GlobalKTable. I'm sure that some > users will throw KTable and GlobalKTable into one conceptual "it's all > tables!" bucket and then wonder how the KIP might affect global tables.
Good point, I'll add. > > Damian wrote: >> I think if no store name is provided users would still be able to query > the >> store, just the store name would be some internally generated name. They >> would be able to discover those names via the IQ API. > > I, too, think that users should be able to query a store even if its name > was internally generated. After all, the data is already there / > materialized. Yes, there is nothing that will prevent users from querying internally generated stores, but they cannot assume a store will necessarily be queryable. So if it's there, they can query it. If it's not there, and they didn't provide a queryable name, they cannot complain and say "hey, where is my store". If they must absolutely be certain that a store is queryable, then they must provide a queryable name. > > > Damian wrote: >> I think for some stores it will make sense to not create a physical > store, i.e., >> for thinks like `filter`, as this will save the rocksdb overhead. But i > guess that >> is more of an implementation detail. > > I think it would help if the KIP would clarify what we'd do in such a > case. For example, if the user did not specify a store name for > `KTable#filter` -- would it be queryable? If so, would this imply we'd > always materialize the state store, or...? I'll clarify in the KIP with some more examples. Materialization will be an internal concept. A store can be queryable whether it's materialized or not (e.g., through advanced implementations that compute the value of a filter on a fly, rather than materialize the answer). Thanks, Eno > > -Michael > > > > > On Tue, Apr 11, 2017 at 9:14 AM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Eno, >> >> Thanks for the update. I agree with what Matthias said. I wonder if the KIP >> should talk less about materialization and more about querying? After all, >> that is what is being provided from an end-users perspective. >> >> I think if no store name is provided users would still be able to query the >> store, just the store name would be some internally generated name. They >> would be able to discover those names via the IQ API >> >> I think for some stores it will make sense to not create a physical store, >> i.e., for thinks like `filter`, as this will save the rocksdb overhead. But >> i guess that is more of an implementation detail. >> >> Cheers, >> Damian >> >> On Tue, 11 Apr 2017 at 00:36 Eno Thereska <eno.there...@gmail.com> wrote: >> >>> Hi Matthias, >>> >>>> However, this still forces users, to provide a name for store that we >>>> must materialize, even if users are not interested in querying the >>>> stores. Thus, I would like to have overloads for all currently existing >>>> methods having mandatory storeName paremeter, with overloads, that do >>>> not require the storeName parameter. >>> >>> >>> Oh yeah, absolutely, this is part of the KIP. I guess I didn't make it >>> clear, I'll clarify. >>> >>> Thanks >>> Eno >>> >>> >>>> On 10 Apr 2017, at 16:00, Matthias J. Sax <matth...@confluent.io> >> wrote: >>>> >>>> Thanks for pushing this KIP Eno. >>>> >>>> The update give a very clear description about the scope, that is super >>>> helpful for the discussion! >>>> >>>> - To put it into my own words, the KIP focus is on enable to query all >>>> KTables. >>>> ** The ability to query a store is determined by providing a name for >>>> the store. >>>> ** At the same time, providing a name -- and thus making a store >>>> queryable -- does not say anything about an actual materialization (ie, >>>> being queryable and being materialized are orthogonal). >>>> >>>> >>>> I like this overall a lot. However, I would go one step further. Right >>>> now, you suggest to add new overload methods that allow users to >> specify >>>> a storeName -- if `null` is provided and the store is not materialized, >>>> we ignore it completely -- if `null` is provided but the store must be >>>> materialized we generate a internal name. So far so good. >>>> >>>> However, this still forces users, to provide a name for store that we >>>> must materialize, even if users are not interested in querying the >>>> stores. Thus, I would like to have overloads for all currently existing >>>> methods having mandatory storeName paremeter, with overloads, that do >>>> not require the storeName parameter. >>>> >>>> Otherwise, we would still have some methods which optional storeName >>>> parameter and other method with mandatory storeName parameter -- thus, >>>> still some inconsistency. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 4/9/17 8:35 AM, Eno Thereska wrote: >>>>> Hi there, >>>>> >>>>> I've now done a V2 of the KIP, that hopefully addresses the feedback >> in >>> this discussion thread: >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 114%3A+KTable+materialization+and+improved+semantics >>> < >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 114:+KTable+materialization+and+improved+semantics>. >>> Notable changes: >>>>> >>>>> - clearly outline what is in the scope of the KIP and what is not. We >>> ran into the issue where lots of useful, but somewhat tangential >>> discussions came up on interactive queries, declarative DSL etc. The >> exact >>> scope of this KIP is spelled out. >>>>> - decided to go with overloaded methods, not .materialize(), to stay >>> within the spirit of the current declarative DSL. >>>>> - clarified the depreciation plan >>>>> - listed part of the discussion we had under rejected alternatives >>>>> >>>>> If you have any further feedback on this, let's continue on this >> thread. >>>>> >>>>> Thank you >>>>> Eno >>>>> >>>>> >>>>>> On 1 Feb 2017, at 09:04, Eno Thereska <eno.there...@gmail.com> >> wrote: >>>>>> >>>>>> Thanks everyone! I think it's time to do a V2 on the KIP so I'll do >>> that and we can see how it looks and continue the discussion from there. >>> Stay tuned. >>>>>> >>>>>> Thanks >>>>>> Eno >>>>>> >>>>>>> On 30 Jan 2017, at 17:23, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I think Eno's separation is very clear and helpful. In order to >>>>>>> streamline this discussion, I would suggest we focus back on point >> (1) >>>>>>> only, as this is the original KIP question. >>>>>>> >>>>>>> Even if I started to DSL design discussion somehow, because I >> thought >>> it >>>>>>> might be helpful to resolve both in a single shot, I feel that we >> have >>>>>>> too many options about DSL design and we should split it up in two >>>>>>> steps. This will have the disadvantage that we will change the API >>>>>>> twice, but still, I think it will be a more focused discussion. >>>>>>> >>>>>>> I just had another look at the KIP, an it proposes 3 changes: >>>>>>> >>>>>>> 1. add .materialized() -> IIRC it was suggested to name this >>>>>>> .materialize() though (can you maybe update the KIP Eno?) >>>>>>> 2. remove print(), writeAsText(), and foreach() >>>>>>> 3. rename toStream() to toKStream() >>>>>>> >>>>>>> >>>>>>> I completely agree with (2) -- not sure about (3) though because >>>>>>> KStreamBuilder also hast .stream() and .table() as methods. >>>>>>> >>>>>>> However, we might want to introduce a KStream#toTable() -- this was >>>>>>> requested multiple times -- might also be part of a different KIP. >>>>>>> >>>>>>> >>>>>>> Thus, we end up with (1). I would suggest to do a step backward here >>> and >>>>>>> instead of a discussion how to express the changes in the DSL (new >>>>>>> overload, new methods...) we should discuss what the actual change >>>>>>> should be. Like (1) materialize all KTable all the time (2) all the >>> user >>>>>>> to force a materialization to enable querying the KTable (3) allow >> for >>>>>>> queryable non-materialized KTable. >>>>>>> >>>>>>> On more question is, if we want to allow a user-forced >> materialization >>>>>>> only as as local store without changelog, or both (together / >>>>>>> independently)? We got some request like this already. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 1/30/17 3:50 AM, Jan Filipiak wrote: >>>>>>>> Hi Eno, >>>>>>>> >>>>>>>> thanks for putting into different points. I want to put a few >> remarks >>>>>>>> inline. >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> On 30.01.2017 12:19, Eno Thereska wrote: >>>>>>>>> So I think there are several important discussion threads that are >>>>>>>>> emerging here. Let me try to tease them apart: >>>>>>>>> >>>>>>>>> 1. inconsistency in what is materialized and what is not, what is >>>>>>>>> queryable and what is not. I think we all agree there is some >>>>>>>>> inconsistency there and this will be addressed with any of the >>>>>>>>> proposed approaches. Addressing the inconsistency is the point of >>> the >>>>>>>>> original KIP. >>>>>>>>> >>>>>>>>> 2. the exact API for materializing a KTable. We can specify 1) a >>>>>>>>> "store name" (as we do today) or 2) have a ".materialize[d]" call >> or >>>>>>>>> 3) get a handle from a KTable ".getQueryHandle" or 4) have a >> builder >>>>>>>>> construct. So we have discussed 4 options. It is important to >>> remember >>>>>>>>> in this discussion that IQ is not designed for just local queries, >>> but >>>>>>>>> also for distributed queries. In all cases an identifying name/id >> is >>>>>>>>> needed for the store that the user is interested in querying. So >> we >>>>>>>>> end up with a discussion on who provides the name, the user (as >> done >>>>>>>>> today) or if it is generated automatically (as Jan suggests, as I >>>>>>>>> understand it). If it is generated automatically we need a way to >>>>>>>>> expose these auto-generated names to the users and link them to >> the >>>>>>>>> KTables they care to query. >>>>>>>> Hi, the last sentence is what I currently arguing against. The user >>>>>>>> would never see a stringtype indentifier name or anything. All he >>> gets >>>>>>>> is the queryHandle if he executes a get(K) that will be an >>> interactive >>>>>>>> query get. with all the finding the right servers that currently >>> have a >>>>>>>> copy of this underlying store stuff going on. The nice part is that >>> if >>>>>>>> someone retrieves a queryHandle, you know that you have to >>> materialized >>>>>>>> (if you are not already) as queries will be coming. Taking away the >>>>>>>> confusion mentioned in point 1 IMO. >>>>>>>>> >>>>>>>>> 3. The exact boundary between the DSL, that is the processing >>>>>>>>> language, and the storage/IQ queries, and how we jump from one to >>> the >>>>>>>>> other. This is mostly for how we get a handle on a store (so it's >>>>>>>>> related to point 2), rather than for how we query the store. I >> think >>>>>>>>> we all agree that we don't want to limit ways one can query a >> store >>>>>>>>> (e.g., using gets or range queries etc) and the query APIs are not >>> in >>>>>>>>> the scope of the DSL. >>>>>>>> Does the IQ work with range currently? The range would have to be >>>>>>>> started on all stores and then merged by maybe the client. Range >>> force a >>>>>>>> flush to RocksDB currently so I am sure you would get a performance >>> hit >>>>>>>> right there. Time-windows might be okay, but I am not sure if the >>> first >>>>>>>> version should offer the user range access. >>>>>>>>> >>>>>>>>> 4. The nature of the DSL and whether its declarative enough, or >>>>>>>>> flexible enough. Damian made the point that he likes the builder >>>>>>>>> pattern since users can specify, per KTable, things like caching >> and >>>>>>>>> logging needs. His observation (as I understand it) is that the >>>>>>>>> processor API (PAPI) is flexible but doesn't provide any help at >> all >>>>>>>>> to users. The current DSL provides declarative abstractions, but >>> it's >>>>>>>>> not fine-grained enough. This point is much broader than the KIP, >>> but >>>>>>>>> discussing it in this KIPs context is ok, since we don't want to >>> make >>>>>>>>> small piecemeal changes and then realise we're not in the spot we >>> want >>>>>>>>> to be. >>>>>>>> This is indeed much broader. My guess here is that's why both API's >>>>>>>> exists and helping the users to switch back and forth might be a >>> thing. >>>>>>>>> >>>>>>>>> Feel free to pitch in if I have misinterpreted something. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Eno >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com >>> >>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Eno, >>>>>>>>>> >>>>>>>>>> I have a really hard time understanding why we can't. From my >> point >>>>>>>>>> of view everything could be super elegant DSL only + public api >> for >>>>>>>>>> the PAPI-people as already exist. >>>>>>>>>> >>>>>>>>>> The above aproach implementing a .get(K) on KTable is foolisch in >>> my >>>>>>>>>> opinion as it would be to late to know that materialisation would >>> be >>>>>>>>>> required. >>>>>>>>>> But having an API that allows to indicate I want to query this >>> table >>>>>>>>>> and then wrapping the say table's processorname can work out >> really >>>>>>>>>> really nice. The only obstacle I see is people not willing to >> spend >>>>>>>>>> the additional time in implementation and just want a quick shot >>>>>>>>>> option to make it work. >>>>>>>>>> >>>>>>>>>> For me it would look like this: >>>>>>>>>> >>>>>>>>>> table = builder.table() >>>>>>>>>> filteredTable = table.filter() >>>>>>>>>> rawHandle = table.getQueryHandle() // Does the materialisation, >>>>>>>>>> really all names possible but id rather hide the implication of >> it >>>>>>>>>> materializes >>>>>>>>>> filteredTableHandle = filteredTable.getQueryHandle() // this >> would >>>>>>>>>> _not_ materialize again of course, the source or the aggregator >>> would >>>>>>>>>> stay the only materialized processors >>>>>>>>>> streams = new streams(builder) >>>>>>>>>> >>>>>>>>>> This middle part is highly flexible I could imagin to force the >>> user >>>>>>>>>> todo something like this. This implies to the user that his >> streams >>>>>>>>>> need to be running >>>>>>>>>> instead of propagating the missing initialisation back by >>> exceptions. >>>>>>>>>> Also if the users is forced to pass the appropriate streams >>> instance >>>>>>>>>> back can change. >>>>>>>>>> I think its possible to build multiple streams out of one >> topology >>>>>>>>>> so it would be easiest to implement aswell. This is just what I >>> maybe >>>>>>>>>> had liked the most >>>>>>>>>> >>>>>>>>>> streams.start(); >>>>>>>>>> rawHandle.prepare(streams) >>>>>>>>>> filteredHandle.prepare(streams) >>>>>>>>>> >>>>>>>>>> later the users can do >>>>>>>>>> >>>>>>>>>> V value = rawHandle.get(K) >>>>>>>>>> V value = filteredHandle.get(K) >>>>>>>>>> >>>>>>>>>> This could free DSL users from anything like storenames and how >> and >>>>>>>>>> what to materialize. Can someone indicate what the problem would >> be >>>>>>>>>> implementing it like this. >>>>>>>>>> Yes I am aware that the current IQ API will not support querying >> by >>>>>>>>>> KTableProcessorName instread of statestoreName. But I think that >>> had >>>>>>>>>> to change if you want it to be intuitive >>>>>>>>>> IMO you gotta apply the filter read time >>>>>>>>>> >>>>>>>>>> Looking forward to your opinions >>>>>>>>>> >>>>>>>>>> Best Jan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> #DeathToIQMoreAndBetterConnectors >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 30.01.2017 10:42, Eno Thereska wrote: >>>>>>>>>>> Hi there, >>>>>>>>>>> >>>>>>>>>>> The inconsistency will be resolved, whether with materialize or >>>>>>>>>>> overloaded methods. >>>>>>>>>>> >>>>>>>>>>> With the discussion on the DSL & stores I feel we've gone in a >>>>>>>>>>> slightly different tangent, which is worth discussing >> nonetheless. >>>>>>>>>>> We have entered into an argument around the scope of the DSL. >> The >>>>>>>>>>> DSL has been designed primarily for processing. The DSL does not >>>>>>>>>>> dictate ways to access state stores or what hind of queries to >>>>>>>>>>> perform on them. Hence, I see the mechanism for accessing >> storage >>> as >>>>>>>>>>> decoupled from the DSL. >>>>>>>>>>> >>>>>>>>>>> We could think of ways to get store handles from part of the >> DSL, >>>>>>>>>>> like the KTable abstraction. However, subsequent queries will be >>>>>>>>>>> store-dependent and not rely on the DSL, hence I'm not sure we >> get >>>>>>>>>>> any grand-convergence DSL-Store here. So I am arguing that the >>>>>>>>>>> current way of getting a handle on state stores is fine. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Eno >>>>>>>>>>> >>>>>>>>>>>> On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Thinking loud here about the API options (materialize v.s. >>> overloaded >>>>>>>>>>>> functions) and its impact on IQ: >>>>>>>>>>>> >>>>>>>>>>>> 1. The first issue of the current DSL is that, there is >>>>>>>>>>>> inconsistency upon >>>>>>>>>>>> whether / how KTables should be materialized: >>>>>>>>>>>> >>>>>>>>>>>> a) in many cases the library HAS TO materialize KTables no >>>>>>>>>>>> matter what, >>>>>>>>>>>> e.g. KStream / KTable aggregation resulted KTables, and hence >> we >>>>>>>>>>>> enforce >>>>>>>>>>>> users to provide store names and throw RTE if it is null; >>>>>>>>>>>> b) in some other cases, the KTable can be materialized or not; >>> for >>>>>>>>>>>> example in KStreamBuilder.table(), store names can be nullable >>> and >>>>>>>>>>>> in which >>>>>>>>>>>> case the KTable would not be materialized; >>>>>>>>>>>> c) in some other cases, the KTable will never be materialized, >>> for >>>>>>>>>>>> example KTable.filter() resulted KTables, and users have no >>> options to >>>>>>>>>>>> enforce them to be materialized; >>>>>>>>>>>> d) this is related to a), where some KTables are required to >> be >>>>>>>>>>>> materialized, but we do not enforce users to provide a state >>> store >>>>>>>>>>>> name, >>>>>>>>>>>> e.g. KTables involved in joins; a RTE will be thrown not >>>>>>>>>>>> immediately but >>>>>>>>>>>> later in this case. >>>>>>>>>>>> >>>>>>>>>>>> 2. The second issue is related to IQ, where state stores are >>>>>>>>>>>> accessed by >>>>>>>>>>>> their state stores; so only those KTable's that have >>> user-specified >>>>>>>>>>>> state >>>>>>>>>>>> stores will be queryable. But because of 1) above, many stores >>> may >>>>>>>>>>>> not be >>>>>>>>>>>> interested to users for IQ but they still need to provide a >>>>>>>>>>>> (dummy?) state >>>>>>>>>>>> store name for them; while on the other hand users cannot query >>>>>>>>>>>> some state >>>>>>>>>>>> stores, e.g. the ones generated by KTable.filter() as there is >> no >>>>>>>>>>>> APIs for >>>>>>>>>>>> them to specify a state store name. >>>>>>>>>>>> >>>>>>>>>>>> 3. We are aware from user feedbacks that such backend details >>> would be >>>>>>>>>>>> better be abstracted away from the DSL layer, where app >>> developers >>>>>>>>>>>> should >>>>>>>>>>>> just focus on processing logic, while state stores along with >>> their >>>>>>>>>>>> changelogs etc would better be in a different mechanism; same >>>>>>>>>>>> arguments >>>>>>>>>>>> have been discussed for serdes / windowing triggers as well. >> For >>>>>>>>>>>> serdes >>>>>>>>>>>> specifically, we had a very long discussion about it and >>> concluded >>>>>>>>>>>> that, at >>>>>>>>>>>> least in Java7, we cannot completely abstract serde away in the >>>>>>>>>>>> DSL, so we >>>>>>>>>>>> choose the other extreme to enforce users to be completely >> aware >>> of >>>>>>>>>>>> the >>>>>>>>>>>> serde requirements when some KTables may need to be >> materialized >>> vis >>>>>>>>>>>> overloaded API functions. While for the state store names, I >> feel >>>>>>>>>>>> it is a >>>>>>>>>>>> different argument than serdes (details below). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> So to me, for either materialize() v.s. overloaded functions >>>>>>>>>>>> directions, >>>>>>>>>>>> the first thing I'd like to resolve is the inconsistency issue >>>>>>>>>>>> mentioned >>>>>>>>>>>> above. So in either case: KTable materialization will not be >>> affect >>>>>>>>>>>> by user >>>>>>>>>>>> providing state store name or not, but will only be decided by >>> the >>>>>>>>>>>> library >>>>>>>>>>>> when it is necessary. More specifically, only join operator and >>>>>>>>>>>> builder.table() resulted KTables are not always materialized, >> but >>>>>>>>>>>> are still >>>>>>>>>>>> likely to be materialized lazily (e.g. when participated in a >>> join >>>>>>>>>>>> operator). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> For overloaded functions that would mean: >>>>>>>>>>>> >>>>>>>>>>>> a) we have an overloaded function for ALL operators that could >>>>>>>>>>>> result >>>>>>>>>>>> in a KTable, and allow it to be null (i.e. for the function >>> without >>>>>>>>>>>> this >>>>>>>>>>>> param it is null by default); >>>>>>>>>>>> b) null-state-store-name do not indicate that a KTable would >>>>>>>>>>>> not be >>>>>>>>>>>> materialized, but that it will not be used for IQ at all >>> (internal >>>>>>>>>>>> state >>>>>>>>>>>> store names will be generated when necessary). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> For materialize() that would mean: >>>>>>>>>>>> >>>>>>>>>>>> a) we will remove state store names from ALL operators that >>> could >>>>>>>>>>>> result in a KTable. >>>>>>>>>>>> b) KTables that not calling materialized do not indicate that >> a >>>>>>>>>>>> KTable >>>>>>>>>>>> would not be materialized, but that it will not be used for IQ >>> at all >>>>>>>>>>>> (internal state store names will be generated when necessary). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Again, in either ways the API itself does not "hint" about >>> anything >>>>>>>>>>>> for >>>>>>>>>>>> materializing a KTable or not at all; it is still purely >>> determined >>>>>>>>>>>> by the >>>>>>>>>>>> library when parsing the DSL for now. >>>>>>>>>>>> >>>>>>>>>>>> Following these thoughts, I feel that 1) we should probably >>> change >>>>>>>>>>>> the name >>>>>>>>>>>> "materialize" since it may be misleading to users as what >>> actually >>>>>>>>>>>> happened >>>>>>>>>>>> behind the scene, to e.g. Damian suggested >> "queryableStore(String >>>>>>>>>>>> storeName)", >>>>>>>>>>>> which returns a QueryableStateStore, and can replace the >>>>>>>>>>>> `KafkaStreams.store` function; 2) comparing those two options >>>>>>>>>>>> assuming we >>>>>>>>>>>> get rid of the misleading function name, I personally favor not >>>>>>>>>>>> adding more >>>>>>>>>>>> overloading functions as it keeps the API simpler. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak >>>>>>>>>>>> <jan.filip...@trivago.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> thanks for your mail, felt like this can clarify some things! >>> The >>>>>>>>>>>>> thread >>>>>>>>>>>>> unfortunately split but as all branches close in on what my >>>>>>>>>>>>> suggestion was >>>>>>>>>>>>> about Ill pick this to continue >>>>>>>>>>>>> >>>>>>>>>>>>> Of course only the table the user wants to query would be >>>>>>>>>>>>> materialized. >>>>>>>>>>>>> (retrieving the queryhandle implies materialisation). So In >> the >>>>>>>>>>>>> example of >>>>>>>>>>>>> KTable::filter if you call >>>>>>>>>>>>> getIQHandle on both tables only the one source that is there >>> would >>>>>>>>>>>>> materialize and the QueryHandleabstraction would make sure it >>> gets >>>>>>>>>>>>> mapped >>>>>>>>>>>>> and filtered and what not uppon read as usual. >>>>>>>>>>>>> >>>>>>>>>>>>> Of Course the Object you would retrieve would maybe only wrap >>> the >>>>>>>>>>>>> storeName / table unique identifier and a way to access the >>> streams >>>>>>>>>>>>> instance and then basically uses the same mechanism that is >>>>>>>>>>>>> currently used. >>>>>>>>>>>>> From my point of view this is the least confusing way for DSL >>>>>>>>>>>>> users. If >>>>>>>>>>>>> its to tricky to get a hand on the streams instance one could >>> ask >>>>>>>>>>>>> the user >>>>>>>>>>>>> to pass it in before executing queries, therefore making sure >>> the >>>>>>>>>>>>> streams >>>>>>>>>>>>> instance has been build. >>>>>>>>>>>>> >>>>>>>>>>>>> The effort to implement this is indeed some orders of >> magnitude >>>>>>>>>>>>> higher >>>>>>>>>>>>> than the overloaded materialized call. As long as I could help >>>>>>>>>>>>> getting a >>>>>>>>>>>>> different view I am happy. >>>>>>>>>>>>> >>>>>>>>>>>>> Best Jan >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 28.01.2017 09:36, Eno Thereska wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Jan, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand your concern. One implication of not passing any >>>>>>>>>>>>>> store name >>>>>>>>>>>>>> and just getting an IQ handle is that all KTables would need >>> to be >>>>>>>>>>>>>> materialised. Currently the store name (or proposed >>>>>>>>>>>>>> .materialize() call) >>>>>>>>>>>>>> act as hints on whether to materialise the KTable or not. >>>>>>>>>>>>>> Materialising >>>>>>>>>>>>>> every KTable can be expensive, although there are some tricks >>> one >>>>>>>>>>>>>> can play, >>>>>>>>>>>>>> e.g., have a virtual store rather than one backed by a Kafka >>> topic. >>>>>>>>>>>>>> >>>>>>>>>>>>>> However, even with the above, after getting an IQ handle, the >>>>>>>>>>>>>> user would >>>>>>>>>>>>>> still need to use IQ APIs to query the state. As such, we >> would >>>>>>>>>>>>>> still >>>>>>>>>>>>>> continue to be outside the original DSL so this wouldn't >>> address >>>>>>>>>>>>>> your >>>>>>>>>>>>>> original concern. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So I read this suggestion as simplifying the APIs by removing >>> the >>>>>>>>>>>>>> store >>>>>>>>>>>>>> name, at the cost of having to materialise every KTable. It's >>>>>>>>>>>>>> definitely an >>>>>>>>>>>>>> option we'll consider as part of this KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Eno >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 28 Jan 2017, at 06:49, Jan Filipiak < >>> jan.filip...@trivago.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> Hi Exactly >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I know it works from the Processor API, but my suggestion >>> would >>>>>>>>>>>>>>> prevent >>>>>>>>>>>>>>> DSL users dealing with storenames what so ever. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In general I am pro switching between DSL and Processor API >>>>>>>>>>>>>>> easily. (In >>>>>>>>>>>>>>> my Stream applications I do this a lot with reflection and >>>>>>>>>>>>>>> instanciating >>>>>>>>>>>>>>> KTableImpl) Concerning this KIP all I say is that there >> should >>>>>>>>>>>>>>> be a DSL >>>>>>>>>>>>>>> concept of "I want to expose this __KTable__. This can be a >>>>>>>>>>>>>>> Method like >>>>>>>>>>>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the >> table >>>>>>>>>>>>>>> would know >>>>>>>>>>>>>>> to materialize, and the user had a reference to the "store >>> and the >>>>>>>>>>>>>>> distributed query mechanism by the Interactive Query Handle" >>>>>>>>>>>>>>> under the hood >>>>>>>>>>>>>>> it can use the same mechanism as the PIP people again. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I hope you see my point J >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jan, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the IQ feature is not limited to Streams DSL but can also >> be >>>>>>>>>>>>>>>> used for >>>>>>>>>>>>>>>> Stores used in PAPI. Thus, we need a mechanism that does >> work >>>>>>>>>>>>>>>> for PAPI >>>>>>>>>>>>>>>> and DSL. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Nevertheless I see your point and I think we could provide >> a >>>>>>>>>>>>>>>> better API >>>>>>>>>>>>>>>> for KTable stores including the discovery of remote shards >> of >>>>>>>>>>>>>>>> the same >>>>>>>>>>>>>>>> KTable. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Michael: Yes, right now we do have a lot of overloads and >> I >>> am >>>>>>>>>>>>>>>> not a >>>>>>>>>>>>>>>> big fan of those -- I would rather prefer a builder >> pattern. >>>>>>>>>>>>>>>> But that >>>>>>>>>>>>>>>> might be a different discussion (nevertheless, if we would >>> aim >>>>>>>>>>>>>>>> for a API >>>>>>>>>>>>>>>> rework, we should get the changes with regard to stores >> right >>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>> beginning on, in order to avoid a redesign later on.) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> something like: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> stream.groupyByKey() >>>>>>>>>>>>>>>> .window(TimeWindow.of(5000)) >>>>>>>>>>>>>>>> .aggregate(...) >>>>>>>>>>>>>>>> .withAggValueSerde(new CustomTypeSerde()) >>>>>>>>>>>>>>>> .withStoreName("storeName); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (This would also reduce JavaDoc redundancy -- maybe a >>> personal >>>>>>>>>>>>>>>> pain >>>>>>>>>>>>>>>> point right now :)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yeah, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find >>> them >>>>>>>>>>>>>>>>> anywhere >>>>>>>>>>>>>>>>> close to being interesting. The Problem IMO is that people >>>>>>>>>>>>>>>>> need to know >>>>>>>>>>>>>>>>> the Store name), so we are working on different levels to >>>>>>>>>>>>>>>>> achieve a >>>>>>>>>>>>>>>>> single goal. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What is your peoples opinion on having a method on KTABLE >>> that >>>>>>>>>>>>>>>>> returns >>>>>>>>>>>>>>>>> them something like a Keyvalue store. There is of course >>>>>>>>>>>>>>>>> problems like >>>>>>>>>>>>>>>>> "it cant be used before the streamthreads are going and >>>>>>>>>>>>>>>>> groupmembership >>>>>>>>>>>>>>>>> is established..." but the benefit would be that for the >>> user >>>>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> consistent way of saying "Hey I need it materialized as >>>>>>>>>>>>>>>>> querries gonna >>>>>>>>>>>>>>>>> be comming" + already get a Thing that he can execute the >>>>>>>>>>>>>>>>> querries on >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> 1 step. >>>>>>>>>>>>>>>>> What I think is unintuitive here is you need to say >>>>>>>>>>>>>>>>> materialize on this >>>>>>>>>>>>>>>>> Ktable and then you go somewhere else and find its store >>> name >>>>>>>>>>>>>>>>> and then >>>>>>>>>>>>>>>>> you go to the kafkastreams instance and ask for the store >>> with >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> name. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So one could the user help to stay in DSL land and >> therefore >>>>>>>>>>>>>>>>> maybe >>>>>>>>>>>>>>>>> confuse him less. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 27.01.2017 16:51, Damian Guy wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think Jan is saying that they don't always need to be >>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>> i.e., >>>>>>>>>>>>>>>>>> filter just needs to apply the ValueGetter, it doesn't >>> need yet >>>>>>>>>>>>>>>>>> another >>>>>>>>>>>>>>>>>> physical state store. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll < >>> mich...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Like Damian, and for the same reasons, I am more in favor >>> of >>>>>>>>>>>>>>>>>>> overloading >>>>>>>>>>>>>>>>>>> methods rather than introducing `materialize()`. >>>>>>>>>>>>>>>>>>> FWIW, we already have a similar API setup for e.g. >>>>>>>>>>>>>>>>>>> `KTable#through(topicName, stateStoreName)`. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> A related but slightly different question is what e.g. >> Jan >>>>>>>>>>>>>>>>>>> Filipiak >>>>>>>>>>>>>>>>>>> mentioned earlier in this thread: >>>>>>>>>>>>>>>>>>> I think we need to explain more clearly why KIP-114 >>> doesn't >>>>>>>>>>>>>>>>>>> propose >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> seemingly simpler solution of always materializing >>> tables/state >>>>>>>>>>>>>>>>>>> stores. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak < >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? >> If >>>>>>>>>>>>>>>>>>>> you uses >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> ValueGetter of Filter it will apply the filter and >>> should be >>>>>>>>>>>>>>>>>>>> completely >>>>>>>>>>>>>>>>>>>> transparent as to if another processor or IQ is >> accessing >>>>>>>>>>>>>>>>>>>> it? How >>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> new method help? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I cannot see the reason for the additional materialize >>>>>>>>>>>>>>>>>>>> method being >>>>>>>>>>>>>>>>>>>> required! Hence I suggest leave it alone. >>>>>>>>>>>>>>>>>>>> regarding removing the others I dont have strong >> opinions >>>>>>>>>>>>>>>>>>>> and it >>>>>>>>>>>>>>>>>>>> seems to >>>>>>>>>>>>>>>>>>>> be unrelated. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Forwarding this thread to the users list too in case >>> people >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> comment. It is also on the dev list. >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Begin forwarded message: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable >> materialization >>> and >>>>>>>>>>>>>>>>>>>>>> improved >>>>>>>>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT >>>>>>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org >>>>>>>>>>>>>>>>>>>>>> Reply-To: dev@kafka.apache.org >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> That not what I meant by "huge impact". >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I refer to the actions related to materialize a >> KTable: >>>>>>>>>>>>>>>>>>>>>> creating a >>>>>>>>>>>>>>>>>>>>>> RocksDB store and a changelog topic -- users should >> be >>>>>>>>>>>>>>>>>>>>>> aware about >>>>>>>>>>>>>>>>>>>>>> runtime implication and this is better expressed by >> an >>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>> call, rather than implicitly triggered by using a >>> different >>>>>>>>>>>>>>>>>>>>>> overload of >>>>>>>>>>>>>>>>>>>>>> a method. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think your definition of a huge impact and mine are >>> rather >>>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>> ;-P >>>>>>>>>>>>>>>>>>>>>>> Overloading a few methods is not really a huge >> impact >>>>>>>>>>>>>>>>>>>>>>> IMO. It is >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> also a >>>>>>>>>>>>>>>>>>>> sacrifice worth making for readability, usability of >> the >>> API. >>>>>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I understand your argument, but do not agree with >> it. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Your first version (even if the "flow" is not as >>> nice) >>>>>>>>>>>>>>>>>>>>>>>> is more >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>>>>>>>> than the second version. Adding a stateStoreName >>> parameter >>>>>>>>>>>>>>>>>>>> is quite >>>>>>>>>>>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer >> the >>>>>>>>>>>>>>>>>>>>>>>> rather more >>>>>>>>>>>>>>>>>>>>>>>> verbose >>>>>>>>>>>>>>>>>>>>>>>> but explicit version. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts >>> the >>>>>>>>>>>>>>>>>>>>>>>> flow, >>>>>>>>>>>>>>>>>>>>>>>>> i.e, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>> table.mapValue(..).materialize().join(..).materialize() >>>>>>>>>>>>>>>>>>>>>>>>> compared to: >>>>>>>>>>>>>>>>>>>>>>>>> table.mapValues(..).join(..) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I know which one i prefer. >>>>>>>>>>>>>>>>>>>>>>>>> My preference is stil to provide overloaded >> methods >>> where >>>>>>>>>>>>>>>>>>>>>>>>> people can >>>>>>>>>>>>>>>>>>>>>>>>> specify the store names if they want, otherwise we >>> just >>>>>>>>>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax >>>>>>>>>>>>>>>>>>>>>>>>> <matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing >> store >>>>>>>>>>>>>>>>>>>>>>>>>> name from >>>>>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>>>> methods and generate internal names (however, I >>> would >>>>>>>>>>>>>>>>>>>>>>>>>> do this >>>>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users >>> to call >>>>>>>>>>>>>>>>>>>>>>>>>> .materialize() >>>>>>>>>>>>>>>>>>>>>>>>>> if they want to query a store, but add one more >>> method >>>>>>>>>>>>>>>>>>>>>>>>>> .stateStoreName() >>>>>>>>>>>>>>>>>>>>>>>>>> that returns the store name if the KTable is >>>>>>>>>>>>>>>>>>>>>>>>>> materialized. >>>>>>>>>>>>>>>>>>>>>>>>>> Thus, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>> .materialize() must not necessarily have a parameter >>> storeName >>>>>>>>>>>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>> should have some overloads here). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I would also not allow to provide a null store >>> name (to >>>>>>>>>>>>>>>>>>>>>>>>>> indicate no >>>>>>>>>>>>>>>>>>>>>>>>>> materialization if not necessary) but throw an >>>>>>>>>>>>>>>>>>>>>>>>>> exception. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> This yields some simplification (see below). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about >>>>>>>>>>>>>>>>>>>>>>>>>> KStream#toTable() >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 3) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on >>>>>>>>>>>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >>>>>>>>>>>>>>>>>>>>>>>>>>> (providing >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there >> is >>> no >>>>>>>>>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>>>>> a second materialization and also no exception >>> must be >>>>>>>>>>>>>>>>>>>>>>>>>> throws. A >>>>>>>>>>>>>>>>>>>>>>>>>> call to >>>>>>>>>>>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized >>> flag" (ie, >>>>>>>>>>>>>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>>>>>>>>>>>>> operation) and sets a new name. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 4) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We >> also >>> use >>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and >>>>>>>>>>>>>>>>>>>>>>>>>>> `KStreamBuilder#table()`, for >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Eno's reply: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it >>>>>>>>>>>>>>>>>>>>>>>>>> absolutely >>>>>>>>>>>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> we are converting it to. >>>>>>>>>>>>>>>>>>>>>>>>>> I'd say we should probably change the >>> KStreamBuilder >>>>>>>>>>>>>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I would keep #toStream(). (see below) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 5) We should not remove any methods but only >>>>>>>>>>>>>>>>>>>>>>>>>> deprecate them. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> A general note: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I do not understand your comments "Rejected >>>>>>>>>>>>>>>>>>>>>>>>>> Alternatives". You >>>>>>>>>>>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>>>>>>>>>> "Have >>>>>>>>>>>>>>>>>>>>>>>>>> the KTable be the materialized view" was >> rejected. >>>>>>>>>>>>>>>>>>>>>>>>>> But your >>>>>>>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of >>>>>>>>>>>>>>>>>>>>>>>>>> KTable is >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> secondary >>>>>>>>>>>>>>>>>>>> after those changes and the "view" abstraction is what >> a >>>>>>>>>>>>>>>>>>>>>>>>>> KTable is. >>>>>>>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>>>>> just to be clear, I like this a lot: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with the name KTable >>>>>>>>>>>>>>>>>>>>>>>>>> - is aligns with stream-table-duality >>>>>>>>>>>>>>>>>>>>>>>>>> - it aligns with IQ >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" >>> (as >>>>>>>>>>>>>>>>>>>>>>>>>> materialization is >>>>>>>>>>>>>>>>>>>>>>>>>> optional). >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta >> comments >>>>>>>>>>>>>>>>>>>>>>>>>> and a few >>>>>>>>>>>>>>>>>>>>>>>>>>> detailed >>>>>>>>>>>>>>>>>>>>>>>>>>> comments: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, >>> but >>>>>>>>>>>>>>>>>>>>>>>>>>> I would >>>>>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>>>>>>>> how other KTable functions should be updated >>>>>>>>>>>>>>>>>>>>>>>>> accordingly. For >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name >>>>>>>>>>>>>>>>>>>>>>>>> parameter, and >>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> always materialize the KTable unless its state store >> name >>>>>>>>>>>>>>>>>>>> is set >>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> null; >>>>>>>>>>>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be >>>>>>>>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> hence >>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join >>> requires the >>>>>>>>>>>>>>>>>>>>>>>>> joining >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> be materialized. And today we do not actually >> have >>> a >>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism to >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>>>>>>>>>> that, but will only throw an exception at runtime >> if >>>>>>>>>>>>>>>>>>>>>>>>> it is not >>>>>>>>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE >>> will be >>>>>>>>>>>>>>>>>>>>>>>>>>> thrown). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off >> the >>>>>>>>>>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>> let's >>>>>>>>>>>>>>>>>>>>>>>>> remove all the state store params in other KTable >>>>>>>>>>>>>>>>>>>>>>>>> functions, >>>>>>>>>>>>>>>>>>>>>>>>> and if >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable >>>>>>>>>>>>>>>>>>>>>>>>>> resulted >>>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> KXX.agg) >>>>>>>>>>>>>>>>>>>>>>>>> and users do not call materialize(), then we treat >>> it >>>>>>>>>>>>>>>>>>>>>>>>> as "users >>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use >> an >>>>>>>>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> generated >>>>>>>>>>>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is >>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call >>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>>>>>>>>>> but we have already decided to materialize it, we >>> can >>>>>>>>>>>>>>>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> name with the user's provided names. Then from a >>> user's >>>>>>>>>>>>>>>>>>>>>>>>>> point-view, >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call >>>>>>>>>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>>>>>>>> state store name. This approach has one >> awkwardness >>>>>>>>>>>>>>>>>>>>>>>>> though, >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> state store names param are not separated and >>> could be >>>>>>>>>>>>>>>>>>>>>>>>>>> overlapped >>>>>>>>>>>>>>>>>>>>>>>>>>> (see >>>>>>>>>>>>>>>>>>>>>>>>>>> detailed comment #2 below). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2. This step does not need to be included in >> this >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP, but >>>>>>>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>> reference / future work: as we have discussed before, >> we >>> may >>>>>>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>>>>> materialize KTable.join resulted KTables as well in the >>>>>>>>>>>>>>>>>>>>>>>>>>> future. If >>>>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>>> that, then: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always >>> materialized; >>>>>>>>>>>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to >>>>>>>>>>>>>>>>>>>>>>>>>>> always be >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>> (otherwise we would not know the old value); >>>>>>>>>>>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always >>>>>>>>>>>>>>>>>>>>>>>>>>> materialized, and >>>>>>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> joining KTables to always be materialized. >>>>>>>>>>>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables >>>>>>>>>>>>>>>>>>>>>>>>>>> materialization >>>>>>>>>>>>>>>>>>>>>>>>>>> depend >>>>>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>>> parent's materialization; >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> By recursive induction all KTables are actually >>> always >>>>>>>>>>>>>>>>>>>>>>>>>> materialized, >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for >>>>>>>>>>>>>>>>>>>>>>>>> specifying >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to >>> send >>>>>>>>>>>>>>>>>>>>>>>>>>> Change<V> in >>>>>>>>>>>>>>>>>>>>>>>>>>> repartition topics within joins any more, but >>> only for >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> repartitions >>>>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a >>>>>>>>>>>>>>>>>>>>>>>>>> "tombstone" >>>>>>>>>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> old value and we do not need to calculate joins >>> twice >>>>>>>>>>>>>>>>>>>>>>>>> (one more >>>>>>>>>>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> old value is received). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a >>>>>>>>>>>>>>>>>>>>>>>>>>> "KStream#toTable()" >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where >>> the >>>>>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of >>> use >>>>>>>>>>>>>>>>>>>>>>>>>>> cases of >>>>>>>>>>>>>>>>>>>>>>>>>>> this, >>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>> example, users want to read a changelog topic, >>> apply >>>>>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>> filters, >>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>>>> materialize it into a KTable with state stores >>> without >>>>>>>>>>>>>>>>>>>>>>>>>>> creating >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> duplicated >>>>>>>>>>>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable >>> I'd >>>>>>>>>>>>>>>>>>>>>>>>>> imagine >>>>>>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>>> specify sth. like: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>>>>>>>> KStream stream = >>> builder.stream("topic1").filter(..); >>>>>>>>>>>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..); >>>>>>>>>>>>>>>>>>>>>>>>>>> table.materialize("state1"); >>>>>>>>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> And the library in this case could set store >>>>>>>>>>>>>>>>>>>>>>>>>>> "state1" 's >>>>>>>>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly >>> while >>>>>>>>>>>>>>>>>>>>>>>>>>> (re-)storing >>>>>>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>>>>> state by reading from this topic, instead of >>> creating a >>>>>>>>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a >>>>>>>>>>>>>>>>>>>>>>>>> semi-duplicate >>>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> "topic1". >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Detailed: >>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; >>>>>>>>>>>>>>>>>>>>>>>>>>> actually I was >>>>>>>>>>> >>