Thanks for the very well written proposal, and sorry for the very-late review. I have a few comments here:
1. We are introducing a "queryableViewName" in the GlobalTable join results, while I'm wondering if we should just add a more general function like "materialize" to KTable and KGlobalTable with the name to be used in queries? 2. For KGlobalTable's own "join" and "leftJoin": since we are only passing the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only the left hand side will logically "trigger" the join, which is different to KTable's join semantics. I'm wondering if it would be more consistent to have them as: <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> leftkeyMapper, final KeyValueMapper<K1, V1, K> rightkeyMapper, final ValueJoiner<V, V1, R> joiner final String queryableViewName); <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> leftkeyMapper, final KeyValueMapper<K1, V1, K> rightkeyMapper, final ValueJoiner<V, V1, R> joiner, final String queryableViewName); <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner, final String queryableViewName); I.e. add another directional key mapper to join and also to outerJoin. 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to have a separate function from "TopologyBuilder.build" itself? With global tables, is there any scenarios that we want to build the topology without the embedded global tables (i.e. still calling "build")? 4. As for implementation, you mentioned that global table bootstraping will be done in another dedicated thread. Could we also consider moving the logic of bootstrapping the standby-replica state stores into this thread as well, which can then leverage on the existing "restoreConsumer" that does not participate in the consumer group protocol? By doing this I think we can still avoid thread-synchronization while making the logic more clear (ideally the standby restoration do not really need to be in part of the stream thread's main loops). 5. Also for the global table's state directory, I'm assuming it will not be under the per-task directory as it is per instance. But could you elaborate a bit in the wiki about its directory as well? Also could we consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along with this feature since we may need to change the directory path / storage schema formats for these different types of stores moving forward. Guozhang On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <damian....@gmail.com> wrote: > Thanks for the update Michael. > > I just wanted to add that there is one crucial piece of information that > i've failed to add (I apologise). > > To me, the join between 2 Global Tables just produces a view on top of the > underlying tables (this is the same as it works for KTables today). So that > means there is no Physical StateStore that backs the join result, it is > just a Virtual StateStore that knows how to resolve the join when it is > required. I've deliberately taken this path so that we don't end up having > yet another copy of the data, stored on local disk, and sent to another > change-log topic. This also reduces the memory overhead from creating > RocksDBStores and reduces load on the Thread based caches we have. So it is > a resource optimization. > > So while it is technically possible to support outer joins, we would need > to physically materialize the StateStore (and create a changelog-topic for > it), or, we'd need to provide another interface where the user could map > from the outerJoin key to both of the other table keys. This is because the > key of the outerJoin table could be either the key of the lhs table, or the > rhs tables, or something completely different. > > With this and what you have mentioned above in mind i think we should park > outerJoin support for this KIP and re-visit if and when we need it in the > future. > > I'll update the KIP with this. > > Thanks, > Damian > > On Fri, 9 Dec 2016 at 09:53 Michael Noll <mich...@confluent.io> wrote: > > > Damian and I briefly chatted offline (thanks, Damian!), and here's the > > summary of my thoughts and conclusion. > > > > TL;DR: Let's skip outer join support for global tables. > > > > In more detail: > > > > - We agreed that, technically, we can add OUTER JOIN support. However, > > outer joins only work if certain preconditions are met. The > preconditions > > are IMHO simliar/the same as we have for the normal, partitioned KTables > > (e.g. having matching keys and co-partitioned data for the tables), but > in > > the case of global tables the user would need to meet all these > > preconditions in one big swing when specifying the params for the outer > > join call. Even so, you'd only know at run-time whether the > preconditions > > were actually met properly. > > > > - Hence it's quite likely that users will be confused about these > > preconditions and how to meet them, and -- from what we can tell -- use > > cases / user demand for outer joins have been rare. > > > > - So, long story short, even though we could add outer join support we'd > > suggest to skip it for global tables. If we subsequently learn that is a > > lot of user interest in that functionality, we still have the option to > add > > it in the future. > > > > > > Best, > > Michael > > > > > > > > > > > > > > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <damian....@gmail.com> wrote: > > > > > Hi Michael, > > > > > > I don't see how that helps? > > > > > > Lets say we have tables Person(id, device_id, name, ...), Device(id, > > > person_id, type, ...), and both are keyed with same type. And we have a > > > stream, that for the sake of simplicity, has both person_id and > > device_id ( > > > i know this is a bit contrived!) > > > so our join > > > person = builder.globalTable(...); > > > device = builder.globalTable(...); > > > personDevice = builder.outerJoin(device, ...); > > > > > > someStream = builder.stream(..); > > > // which id do i use to join with? person.id? device.id? > > > someStream.leftJoin(personDevice, ...) > > > > > > // Interactive Query on the view generated by the join of person and > > device > > > personDeviceStore = streams.store("personDevice",...); > > > // person.id? device.id? > > > personDeviceStore.get(someId); > > > > > > We get records > > > person id=1, device_id=2 ,... > > > device id=2, person_id=1, ... > > > stream person_id = 1, device_id = 2 > > > > > > We could do the join between the GlobalTables both ways as each side > > could > > > map to the other sides key, but when i'm accessing the resulting table, > > > personDevice, what is the key? The person.id ? the device.id? it can't > > be > > > both of them. > > > > > > Thanks, > > > Damian > > > > > > > > > > > > > > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mich...@confluent.io> wrote: > > > > > > > The key type returned by both KeyValueMappers (in the current trunk > > > > version, that type is named `R`) would need to be the same for this > to > > > > work. > > > > > > > > > > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > > > Michael, > > > > > > > > > > We can only support outerJoin if both tables are keyed the same > way. > > > Lets > > > > > say for example you can map both ways, however, the key for each > > table > > > is > > > > > of a different type. So t1 is long and t2 is string - what is the > key > > > > type > > > > > of the resulting GlobalKTable? So when you subsequently join to > this > > > > table, > > > > > and do a lookup on it, which key are you using? > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mich...@confluent.io> > > wrote: > > > > > > > > > > > Damian, > > > > > > > > > > > > yes, that makes sense. > > > > > > > > > > > > But I am still wondering: In your example, there's no prior > > > knowledge > > > > > "can > > > > > > I map from t1->t2" that Streams can leverage for joining t1 and > t2 > > > > other > > > > > > than blindly relying on the user to provide an appropriate > > > > KeyValueMapper > > > > > > for K1/V1 of t1 -> K2/V2 of t2. In other words, if we allow the > > user > > > > to > > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at > > > compile > > > > > time > > > > > > whether this mapping will actually work), then we can also allow > > the > > > > user > > > > > > to provide a corresponding "reverse" mapper from t2->t1. That > is, > > we > > > > > could > > > > > > say that an outer join between two global tables IS supported, > but > > if > > > > and > > > > > > only if the user provides two KeyValueMappers, one for t1->t2 and > > one > > > > for > > > > > > t2->t1. > > > > > > > > > > > > The left join t1->t2 (which is supported in the KIP), in general, > > > works > > > > > > only because of the existence of the user-provided KeyValueMapper > > > from > > > > > > t1->t2. The outer join, as you point out, cannot satisfied as > > easily > > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 -- > > > otherwise > > > > > the > > > > > > outer join won't work. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <damian....@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Hi Michael, > > > > > > > > > > > > > > Sure. Say we have 2 input topics t1 & t2 below: > > > > > > > t1{ > > > > > > > int key; > > > > > > > string t2_id; > > > > > > > ... > > > > > > > } > > > > > > > > > > > > > > t2 { > > > > > > > string key; > > > > > > > .. > > > > > > > } > > > > > > > If we create global tables out of these we'd get: > > > > > > > GlobalKTable<Integer, ?> t1; > > > > > > > GlobalKTable<String, ?> t2; > > > > > > > > > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as > in > > > > order > > > > > to > > > > > > > perform the join we need to use a KeyValueMapper to extract the > > t2 > > > > key > > > > > > from > > > > > > > the t1 value. > > > > > > > > > > > > > > Does that make sense? > > > > > > > > > > > > > > Thanks, > > > > > > > Damian > > > > > > > > > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mich...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > > > > There is no outer-join for GlobalKTables as the tables may > be > > > > keyed > > > > > > > > > differently. So you need to use the key from the left side > of > > > the > > > > > > join > > > > > > > > > along with the KeyValueMapper to resolve the right side of > > the > > > > > join. > > > > > > > This > > > > > > > > > wont work the other way around. > > > > > > > > > > > > > > > > Care to elaborate why it won't work the other way around? > If, > > > for > > > > > > > example, > > > > > > > > we swapped the call from leftTable.join(rightTable) to > > > > > > > > rightTable.join(leftTable), that join would work, too. > > Perhaps I > > > > am > > > > > > > > missing something though. :-) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy < > > > damian....@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Matthias, > > > > > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > > > > There is no outer-join for GlobalKTables as the tables may > be > > > > keyed > > > > > > > > > differently. So you need to use the key from the left side > of > > > the > > > > > > join > > > > > > > > > along with the KeyValueMapper to resolve the right side of > > the > > > > > join. > > > > > > > This > > > > > > > > > wont work the other way around. > > > > > > > > > > > > > > > > > > On the bootstrapping concern. If the application is failing > > > > before > > > > > > > > > bootstrapping finishes, the problem is likely to be related > > to > > > a > > > > > > > terminal > > > > > > > > > exception, i.e., running out of disk space, corrupt state > > > stores > > > > > etc. > > > > > > > In > > > > > > > > > these cases, we wouldn't want the application to continue. > > So i > > > > > think > > > > > > > > this > > > > > > > > > is ok. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Damian > > > > > > > > > > > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax < > > > > matth...@confluent.io > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for the KIP Damian. Very nice motivating example! > > > > > > > > > > > > > > > > > > > > A few comments: > > > > > > > > > > > > > > > > > > > > - why is there no outer-join for GlobalKTables > > > > > > > > > > - on bootstrapping GlobalKTable, could it happen that > this > > > > never > > > > > > > > > > finishes if the application fails before bootstrapping > > > finishes > > > > > and > > > > > > > new > > > > > > > > > > data gets written at the same time? Do we need to guard > > > against > > > > > > this > > > > > > > > > > (seems to be a very rare corner case, so maybe not > > required)? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > I would like to start the discussion on KIP-99: > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > > > > > > > > action?pageId=67633649 > > > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Damian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > *Michael G. Noll* > > > > > > Product Manager | Confluent > > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> > > <(650)%20453-5860> | @miguno < > > > > https://twitter.com/miguno > > > > > > > > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog > > > > > > <http://www.confluent.io/blog> > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang