> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <matth...@confluent.io> wrote: > > Hi, > > If you want to populate a GlobalKTable you can only do this by reading a > topic. So the short answer for you head line is: no, you can suppress > the intermediate topic.
Bummer! Maybe this is an opt-in optimization to consider later. > > However, I am wondering what the purpose of you secondary index is, and > why you are using a GlobalKTable for it. Maybe you can elaborate a > little bit? Elaborated on this a bit in the other thread, I was trying to keep separate problems separate, but maybe I just made things more confusing! tl;dr is that the user requests values knowing K, but there is actually a "hidden composite key" D that is relevant to the partitioning strategy. The GlobalKTable allows you to look up K -> D, and then find the right local KTable K,D -> V > > I am also wondering about this code snippet: > >>> builder.stream(mainTopic) >>> .mapValues(...) >>> .to(secondaryIndex1) > > Should it not be .map() that transforms (k,v) -> > (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. > In this case, the "externally visible" K needs additional information about the destination D so that it can be partitioned correctly. So the code looks like: // TODO: sucks that this materializes an intermediate topic msgStream .mapValues(v -> v == null ? null : v.getResolvedDestination().toString()) .to(Serdes.String(), Serdes.String(), DEST_INDEX); builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, DEST_INDEX); > > -Matthias > > > On 6/2/17 12:28 PM, Steven Schlansker wrote: >> Hi everyone, another question for the list :) >> >> I'm creating a cluster of KTable (and GlobalKTable) based off the same >> input stream K,V. >> >> It has a number of secondary indices (think like a RDBMS) >> K1 -> K >> K2 -> K >> etc >> >> These are all based off of trivial mappings from my main stream that also >> feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1() >> >> Currently, for each one it seems that I have to do >> >> builder.stream(mainTopic) >> .mapValues(...) >> .to(secondaryIndex1) >> >> builder.globalTable(secondaryIndex1, secondaryIndexStore1); >> >> Unfortunately the intermediate "secondaryIndex1" topic is relatively >> low value. In a case where my state stores are lost, I already have to >> read through the mainTopic to recover the main state store. While it's doing >> that, I'd much rather it rebuild the GlobalKTable instance from that data >> directly. Then I could skip having this index in Kafka at all, it's entirely >> redundant. The data is already loaded and deserialized for the benefit of >> another Processor. >> >> Any thoughts? Happy Friday, >> Steven >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail