You're entirely right. I'd forgotten that each instance would only read a subset of the main topic. Should have figured that out myself. Thanks for the sanity check! :)
> On Jun 2, 2017, at 3:41 PM, Matthias J. Sax <matth...@confluent.io> wrote: > > Makes sense now (considering the explanation form the other thread). > > With regard to an "opt-in" optimization. We could simplify the API and > hide some details, but it wouldn't buy you anything from an execution > point of view. As you need all data on each instance, you need to > somehow "broadcast" the information -- and Kafka Streams applications > use topics to exchange data. Thus, we need a topic anyhow. > > Does this make sense? > > So your overall architecture seems to be sound to me. > > > -Matthias > > > On 6/2/17 2:37 PM, Steven Schlansker wrote: >> >>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> Hi, >>> >>> If you want to populate a GlobalKTable you can only do this by reading a >>> topic. So the short answer for you head line is: no, you can suppress >>> the intermediate topic. >> >> Bummer! Maybe this is an opt-in optimization to consider later. >> >>> >>> However, I am wondering what the purpose of you secondary index is, and >>> why you are using a GlobalKTable for it. Maybe you can elaborate a >>> little bit? >> >> Elaborated on this a bit in the other thread, I was trying to keep separate >> problems separate, but maybe I just made things more confusing! >> >> tl;dr is that the user requests values knowing K, but there is actually a >> "hidden composite key" D that is relevant to the partitioning strategy. >> >> The GlobalKTable allows you to look up K -> D, and then find the right local >> KTable K,D -> V >> >>> >>> I am also wondering about this code snippet: >>> >>>>> builder.stream(mainTopic) >>>>> .mapValues(...) >>>>> .to(secondaryIndex1) >>> >>> Should it not be .map() that transforms (k,v) -> >>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing. >>> >> >> In this case, the "externally visible" K needs additional information about >> the destination D so that it can be partitioned correctly. So the code looks >> like: >> >> // TODO: sucks that this materializes an intermediate topic >> msgStream >> .mapValues(v -> v == null ? null : >> v.getResolvedDestination().toString()) >> .to(Serdes.String(), Serdes.String(), DEST_INDEX); >> >> builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, >> DEST_INDEX); >> >>> >>> -Matthias >>> >>> >>> On 6/2/17 12:28 PM, Steven Schlansker wrote: >>>> Hi everyone, another question for the list :) >>>> >>>> I'm creating a cluster of KTable (and GlobalKTable) based off the same >>>> input stream K,V. >>>> >>>> It has a number of secondary indices (think like a RDBMS) >>>> K1 -> K >>>> K2 -> K >>>> etc >>>> >>>> These are all based off of trivial mappings from my main stream that also >>>> feeds the K -> V StateStore. Think one liners like v -> >>>> v.getSecondaryKey1() >>>> >>>> Currently, for each one it seems that I have to do >>>> >>>> builder.stream(mainTopic) >>>> .mapValues(...) >>>> .to(secondaryIndex1) >>>> >>>> builder.globalTable(secondaryIndex1, secondaryIndexStore1); >>>> >>>> Unfortunately the intermediate "secondaryIndex1" topic is relatively >>>> low value. In a case where my state stores are lost, I already have to >>>> read through the mainTopic to recover the main state store. While it's >>>> doing >>>> that, I'd much rather it rebuild the GlobalKTable instance from that data >>>> directly. Then I could skip having this index in Kafka at all, it's >>>> entirely >>>> redundant. The data is already loaded and deserialized for the benefit of >>>> another Processor. >>>> >>>> Any thoughts? Happy Friday, >>>> Steven >>>> >>> >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail