Thanks Gouzhang - i'll remove the joins. I agree we need to refactor TopologyBuilder, but I think we'll need another KIP for that.
Thanks, Damian On Fri, 30 Dec 2016 at 01:32 Guozhang Wang <wangg...@gmail.com> wrote: > 1/2: Sounds good, let's remove the joins within KGlobalTable for now. > > 3. I see, makes sense. > > Unfortunately since TopologyBuilder is a public class we cannot separate > its internal usage only functions like build / buildWithGlobalTables / etc > with other user functions like stream / table / etc. We need to consider > refactoring this interface sooner than later. > > 4/6. OK. > > > Guozhang > > > On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Guozhang, > > > > Thanks for your input. Answers below, but i'm thinking we should remove > > joins from GlobalKTables for the time being and re-visit if necessary in > > the future. > > > > 1. with a global table the joins are never really materialized (at least > > how i see it), rather they are just views on the existing global tables. > > I've deliberately taken this approach so we don't have to create yet > > another State Store and changelog topic etc. These all consume resources > > that i believe are unnecessary. So, i don't really see the point of > having > > a materialize method. Further, one of the major benefits of joining two > > global tables is being able to query them via Interactive Queries. For > this > > you need the name, so i think it makes sense to provide it with the join. > > > > 2. This has been discussed already in this thread (with Michael), and > > outerJoin is deliberately not part of the KIP. To be able to join both > > ways, as you suggest, requires that both inputs are able to map to the > same > > key. This is not always going to be possible, i.e., relationships can be > > one way, so for that reason i felt it was best to not go down that path > as > > we'd not be able to resolve it at the time that > > globalTable.join(otherGlobalTable,...) was called, and this would result > > in > > possible confusion. Also, to support this we'd need to physically > > materialize a StateStore that represents the join (which i think is a > waste > > of resources), or, we'd need to provide another interface where we can > map > > from the key of the resulting global table to the keys of both of the > > joined tables. > > > > 3. The intention is that the GlobalKTables are in a single topology that > is > > owned and updated by a single thread. So yes it is necessary that they > can > > be created separately. > > > > 4. Bootstrapping and maintaining of the state of GlobalKTables are done > on > > a single thread. This thread will run simultaneously with the current > > StreamThreads. It doesn't make sense to move the bootstrapping of the > > StandbyTasks to this thread as they are logically part of a StreamThread, > > they are 'assigned' to the StreamThread. With GlobalKTables there is no > > assignment as such, the thread just maintains all of them. > > > > 5. Yes i'll update the KIP - the state directory will be under the same > > path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific > > directory, i.e, global_state, rather then being a task directory. > > > > 6. The whole point of GlobalKTables is to have a copy of ALL of the data > on > > each node. I don't think it makes sense to be able to reset the starting > > position. > > > > Thanks, > > Damian > > > > On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wangg...@gmail.com> wrote: > > > > > One more thing to add: > > > > > > 6. For KGlobalTable, it is always bootstrapped from the beginning while > > for > > > other KTables, we are enabling users to override their resetting > position > > > as in > > > > > > https://github.com/apache/kafka/pull/2007 > > > > > > Should we consider doing the same for KGlobalTable as well? > > > > > > > > > Guozhang > > > > > > > > > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > Thanks for the very well written proposal, and sorry for the > very-late > > > > review. I have a few comments here: > > > > > > > > 1. We are introducing a "queryableViewName" in the GlobalTable join > > > > results, while I'm wondering if we should just add a more general > > > function > > > > like "materialize" to KTable and KGlobalTable with the name to be > used > > in > > > > queries? > > > > > > > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only > > > passing > > > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case > > only > > > > the left hand side will logically "trigger" the join, which is > > different > > > to > > > > KTable's join semantics. I'm wondering if it would be more consistent > > to > > > > have them as: > > > > > > > > > > > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, > > > > final KeyValueMapper<K, V, > K1> > > > > leftkeyMapper, > > > > final KeyValueMapper<K1, V1, > K> > > > > rightkeyMapper, > > > > final ValueJoiner<V, V1, R> > > > joiner > > > > final String > > queryableViewName); > > > > > > > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> > > > other, > > > > final KeyValueMapper<K, V, > K1> > > > > leftkeyMapper, > > > > final KeyValueMapper<K1, V1, > > K> > > > > rightkeyMapper, > > > > final ValueJoiner<V, V1, R> > > > > joiner, > > > > final String > > queryableViewName); > > > > > > > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> > > other, > > > > final KeyValueMapper<K, V, > K1> > > > > keyMapper, > > > > final ValueJoiner<V, V1, R> > > > > joiner, > > > > final String > > queryableViewName); > > > > > > > > > > > > I.e. add another directional key mapper to join and also to > outerJoin. > > > > > > > > > > > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to > > > > have a separate function from "TopologyBuilder.build" itself? With > > global > > > > tables, is there any scenarios that we want to build the topology > > without > > > > the embedded global tables (i.e. still calling "build")? > > > > > > > > 4. As for implementation, you mentioned that global table > bootstraping > > > > will be done in another dedicated thread. Could we also consider > moving > > > the > > > > logic of bootstrapping the standby-replica state stores into this > > thread > > > as > > > > well, which can then leverage on the existing "restoreConsumer" that > > does > > > > not participate in the consumer group protocol? By doing this I think > > we > > > > can still avoid thread-synchronization while making the logic more > > clear > > > > (ideally the standby restoration do not really need to be in part of > > the > > > > stream thread's main loops). > > > > > > > > 5. Also for the global table's state directory, I'm assuming it will > > not > > > > be under the per-task directory as it is per instance. But could you > > > > elaborate a bit in the wiki about its directory as well? Also could > we > > > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 > along > > > > with this feature since we may need to change the directory path / > > > storage > > > > schema formats for these different types of stores moving forward. > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > >> Thanks for the update Michael. > > > >> > > > >> I just wanted to add that there is one crucial piece of information > > that > > > >> i've failed to add (I apologise). > > > >> > > > >> To me, the join between 2 Global Tables just produces a view on top > of > > > the > > > >> underlying tables (this is the same as it works for KTables today). > So > > > >> that > > > >> means there is no Physical StateStore that backs the join result, it > > is > > > >> just a Virtual StateStore that knows how to resolve the join when it > > is > > > >> required. I've deliberately taken this path so that we don't end up > > > having > > > >> yet another copy of the data, stored on local disk, and sent to > > another > > > >> change-log topic. This also reduces the memory overhead from > creating > > > >> RocksDBStores and reduces load on the Thread based caches we have. > So > > it > > > >> is > > > >> a resource optimization. > > > >> > > > >> So while it is technically possible to support outer joins, we would > > > need > > > >> to physically materialize the StateStore (and create a > changelog-topic > > > for > > > >> it), or, we'd need to provide another interface where the user could > > map > > > >> from the outerJoin key to both of the other table keys. This is > > because > > > >> the > > > >> key of the outerJoin table could be either the key of the lhs table, > > or > > > >> the > > > >> rhs tables, or something completely different. > > > >> > > > >> With this and what you have mentioned above in mind i think we > should > > > park > > > >> outerJoin support for this KIP and re-visit if and when we need it > in > > > the > > > >> future. > > > >> > > > >> I'll update the KIP with this. > > > >> > > > >> Thanks, > > > >> Damian > > > >> > > > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mich...@confluent.io> > > wrote: > > > >> > > > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's > > the > > > >> > summary of my thoughts and conclusion. > > > >> > > > > >> > TL;DR: Let's skip outer join support for global tables. > > > >> > > > > >> > In more detail: > > > >> > > > > >> > - We agreed that, technically, we can add OUTER JOIN support. > > > However, > > > >> > outer joins only work if certain preconditions are met. The > > > >> preconditions > > > >> > are IMHO simliar/the same as we have for the normal, partitioned > > > KTables > > > >> > (e.g. having matching keys and co-partitioned data for the > tables), > > > but > > > >> in > > > >> > the case of global tables the user would need to meet all these > > > >> > preconditions in one big swing when specifying the params for the > > > outer > > > >> > join call. Even so, you'd only know at run-time whether the > > > >> preconditions > > > >> > were actually met properly. > > > >> > > > > >> > - Hence it's quite likely that users will be confused about these > > > >> > preconditions and how to meet them, and -- from what we can tell > -- > > > use > > > >> > cases / user demand for outer joins have been rare. > > > >> > > > > >> > - So, long story short, even though we could add outer join > support > > > we'd > > > >> > suggest to skip it for global tables. If we subsequently learn > that > > > is > > > >> a > > > >> > lot of user interest in that functionality, we still have the > option > > > to > > > >> add > > > >> > it in the future. > > > >> > > > > >> > > > > >> > Best, > > > >> > Michael > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <damian....@gmail.com> > > > >> wrote: > > > >> > > > > >> > > Hi Michael, > > > >> > > > > > >> > > I don't see how that helps? > > > >> > > > > > >> > > Lets say we have tables Person(id, device_id, name, ...), > > Device(id, > > > >> > > person_id, type, ...), and both are keyed with same type. And we > > > have > > > >> a > > > >> > > stream, that for the sake of simplicity, has both person_id and > > > >> > device_id ( > > > >> > > i know this is a bit contrived!) > > > >> > > so our join > > > >> > > person = builder.globalTable(...); > > > >> > > device = builder.globalTable(...); > > > >> > > personDevice = builder.outerJoin(device, ...); > > > >> > > > > > >> > > someStream = builder.stream(..); > > > >> > > // which id do i use to join with? person.id? device.id? > > > >> > > someStream.leftJoin(personDevice, ...) > > > >> > > > > > >> > > // Interactive Query on the view generated by the join of person > > and > > > >> > device > > > >> > > personDeviceStore = streams.store("personDevice",...); > > > >> > > // person.id? device.id? > > > >> > > personDeviceStore.get(someId); > > > >> > > > > > >> > > We get records > > > >> > > person id=1, device_id=2 ,... > > > >> > > device id=2, person_id=1, ... > > > >> > > stream person_id = 1, device_id = 2 > > > >> > > > > > >> > > We could do the join between the GlobalTables both ways as each > > side > > > >> > could > > > >> > > map to the other sides key, but when i'm accessing the resulting > > > >> table, > > > >> > > personDevice, what is the key? The person.id ? the device.id? > it > > > >> can't > > > >> > be > > > >> > > both of them. > > > >> > > > > > >> > > Thanks, > > > >> > > Damian > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mich...@confluent.io> > > > >> wrote: > > > >> > > > > > >> > > > The key type returned by both KeyValueMappers (in the current > > > trunk > > > >> > > > version, that type is named `R`) would need to be the same for > > > this > > > >> to > > > >> > > > work. > > > >> > > > > > > >> > > > > > > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy < > > damian....@gmail.com> > > > >> > wrote: > > > >> > > > > > > >> > > > > Michael, > > > >> > > > > > > > >> > > > > We can only support outerJoin if both tables are keyed the > > same > > > >> way. > > > >> > > Lets > > > >> > > > > say for example you can map both ways, however, the key for > > each > > > >> > table > > > >> > > is > > > >> > > > > of a different type. So t1 is long and t2 is string - what > is > > > the > > > >> key > > > >> > > > type > > > >> > > > > of the resulting GlobalKTable? So when you subsequently join > > to > > > >> this > > > >> > > > table, > > > >> > > > > and do a lookup on it, which key are you using? > > > >> > > > > > > > >> > > > > Thanks, > > > >> > > > > Damian > > > >> > > > > > > > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll < > > mich...@confluent.io> > > > >> > wrote: > > > >> > > > > > > > >> > > > > > Damian, > > > >> > > > > > > > > >> > > > > > yes, that makes sense. > > > >> > > > > > > > > >> > > > > > But I am still wondering: In your example, there's no > prior > > > >> > > knowledge > > > >> > > > > "can > > > >> > > > > > I map from t1->t2" that Streams can leverage for joining > t1 > > > and > > > >> t2 > > > >> > > > other > > > >> > > > > > than blindly relying on the user to provide an appropriate > > > >> > > > KeyValueMapper > > > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2. In other words, if we > allow > > > the > > > >> > user > > > >> > > > to > > > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not > know > > at > > > >> > > compile > > > >> > > > > time > > > >> > > > > > whether this mapping will actually work), then we can also > > > allow > > > >> > the > > > >> > > > user > > > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1. > > That > > > >> is, > > > >> > we > > > >> > > > > could > > > >> > > > > > say that an outer join between two global tables IS > > supported, > > > >> but > > > >> > if > > > >> > > > and > > > >> > > > > > only if the user provides two KeyValueMappers, one for > > t1->t2 > > > >> and > > > >> > one > > > >> > > > for > > > >> > > > > > t2->t1. > > > >> > > > > > > > > >> > > > > > The left join t1->t2 (which is supported in the KIP), in > > > >> general, > > > >> > > works > > > >> > > > > > only because of the existence of the user-provided > > > >> KeyValueMapper > > > >> > > from > > > >> > > > > > t1->t2. The outer join, as you point out, cannot > satisfied > > as > > > >> > easily > > > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 > -- > > > >> > > otherwise > > > >> > > > > the > > > >> > > > > > outer join won't work. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy < > > > >> damian....@gmail.com> > > > >> > > > wrote: > > > >> > > > > > > > > >> > > > > > > Hi Michael, > > > >> > > > > > > > > > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below: > > > >> > > > > > > t1{ > > > >> > > > > > > int key; > > > >> > > > > > > string t2_id; > > > >> > > > > > > ... > > > >> > > > > > > } > > > >> > > > > > > > > > >> > > > > > > t2 { > > > >> > > > > > > string key; > > > >> > > > > > > .. > > > >> > > > > > > } > > > >> > > > > > > If we create global tables out of these we'd get: > > > >> > > > > > > GlobalKTable<Integer, ?> t1; > > > >> > > > > > > GlobalKTable<String, ?> t2; > > > >> > > > > > > > > > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> > t2 > > > as > > > >> in > > > >> > > > order > > > >> > > > > to > > > >> > > > > > > perform the join we need to use a KeyValueMapper to > > extract > > > >> the > > > >> > t2 > > > >> > > > key > > > >> > > > > > from > > > >> > > > > > > the t1 value. > > > >> > > > > > > > > > >> > > > > > > Does that make sense? > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > Damian > > > >> > > > > > > > > > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll < > > > >> mich...@confluent.io> > > > >> > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the > tables > > > >> may be > > > >> > > > keyed > > > >> > > > > > > > > differently. So you need to use the key from the > left > > > >> side of > > > >> > > the > > > >> > > > > > join > > > >> > > > > > > > > along with the KeyValueMapper to resolve the right > > side > > > of > > > >> > the > > > >> > > > > join. > > > >> > > > > > > This > > > >> > > > > > > > > wont work the other way around. > > > >> > > > > > > > > > > >> > > > > > > > Care to elaborate why it won't work the other way > > around? > > > >> If, > > > >> > > for > > > >> > > > > > > example, > > > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to > > > >> > > > > > > > rightTable.join(leftTable), that join would work, too. > > > >> > Perhaps I > > > >> > > > am > > > >> > > > > > > > missing something though. :-) > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy < > > > >> > > damian....@gmail.com> > > > >> > > > > > > wrote: > > > >> > > > > > > > > > > >> > > > > > > > > Hi Matthias, > > > >> > > > > > > > > > > > >> > > > > > > > > Thanks for the feedback. > > > >> > > > > > > > > > > > >> > > > > > > > > There is no outer-join for GlobalKTables as the > tables > > > >> may be > > > >> > > > keyed > > > >> > > > > > > > > differently. So you need to use the key from the > left > > > >> side of > > > >> > > the > > > >> > > > > > join > > > >> > > > > > > > > along with the KeyValueMapper to resolve the right > > side > > > of > > > >> > the > > > >> > > > > join. > > > >> > > > > > > This > > > >> > > > > > > > > wont work the other way around. > > > >> > > > > > > > > > > > >> > > > > > > > > On the bootstrapping concern. If the application is > > > >> failing > > > >> > > > before > > > >> > > > > > > > > bootstrapping finishes, the problem is likely to be > > > >> related > > > >> > to > > > >> > > a > > > >> > > > > > > terminal > > > >> > > > > > > > > exception, i.e., running out of disk space, corrupt > > > state > > > >> > > stores > > > >> > > > > etc. > > > >> > > > > > > In > > > >> > > > > > > > > these cases, we wouldn't want the application to > > > continue. > > > >> > So i > > > >> > > > > think > > > >> > > > > > > > this > > > >> > > > > > > > > is ok. > > > >> > > > > > > > > > > > >> > > > > > > > > Thanks, > > > >> > > > > > > > > Damian > > > >> > > > > > > > > > > > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax < > > > >> > > > matth...@confluent.io > > > >> > > > > > > > > >> > > > > > > > wrote: > > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating > > > example! > > > >> > > > > > > > > > > > > >> > > > > > > > > > A few comments: > > > >> > > > > > > > > > > > > >> > > > > > > > > > - why is there no outer-join for GlobalKTables > > > >> > > > > > > > > > - on bootstrapping GlobalKTable, could it happen > > that > > > >> this > > > >> > > > never > > > >> > > > > > > > > > finishes if the application fails before > > bootstrapping > > > >> > > finishes > > > >> > > > > and > > > >> > > > > > > new > > > >> > > > > > > > > > data gets written at the same time? Do we need to > > > guard > > > >> > > against > > > >> > > > > > this > > > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not > > > >> > required)? > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > -Matthias > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote: > > > >> > > > > > > > > > > Hi all, > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > I would like to start the discussion on KIP-99: > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > > >> > > > > > > > > action?pageId=67633649 > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > Looking forward to your feedback. > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > > >> > > > > > > > > > > Damian > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > -- > > > >> > > > > > *Michael G. Noll* > > > >> > > > > > Product Manager | Confluent > > > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> > <(650)%20453-5860> > > > <(650)%20453-5860> > > > >> > <(650)%20453-5860> | @miguno < > > > >> > > > https://twitter.com/miguno > > > >> > > > > > > > > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | > > Blog > > > >> > > > > > <http://www.confluent.io/blog> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >