You're entirely right.  I'd forgotten that each instance would only read
a subset of the main topic.  Should have figured that out myself.  Thanks
for the sanity check! :)

> On Jun 2, 2017, at 3:41 PM, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Makes sense now (considering the explanation form the other thread).
> 
> With regard to an "opt-in" optimization. We could simplify the API and
> hide some details, but it wouldn't buy you anything from an execution
> point of view. As you need all data on each instance, you need to
> somehow "broadcast" the information -- and Kafka Streams applications
> use topics to exchange data. Thus, we need a topic anyhow.
> 
> Does this make sense?
> 
> So your overall architecture seems to be sound to me.
> 
> 
> -Matthias
> 
> 
> On 6/2/17 2:37 PM, Steven Schlansker wrote:
>> 
>>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <matth...@confluent.io> wrote:
>>> 
>>> Hi,
>>> 
>>> If you want to populate a GlobalKTable you can only do this by reading a
>>> topic. So the short answer for you head line is: no, you can suppress
>>> the intermediate topic.
>> 
>> Bummer!  Maybe this is an opt-in optimization to consider later.
>> 
>>> 
>>> However, I am wondering what the purpose of you secondary index is, and
>>> why you are using a GlobalKTable for it. Maybe you can elaborate a
>>> little bit?
>> 
>> Elaborated on this a bit in the other thread, I was trying to keep separate
>> problems separate, but maybe I just made things more confusing!
>> 
>> tl;dr is that the user requests values knowing K, but there is actually a
>> "hidden composite key" D that is relevant to the partitioning strategy.
>> 
>> The GlobalKTable allows you to look up K -> D, and then find the right local 
>> KTable K,D -> V
>> 
>>> 
>>> I am also wondering about this code snippet:
>>> 
>>>>> builder.stream(mainTopic)
>>>>>      .mapValues(...)
>>>>>      .to(secondaryIndex1)
>>> 
>>> Should it not be .map() that transforms (k,v) ->
>>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
>>> 
>> 
>> In this case, the "externally visible" K needs additional information about
>> the destination D so that it can be partitioned correctly.  So the code looks
>> like:
>> 
>>        // TODO: sucks that this materializes an intermediate topic
>>        msgStream
>>                .mapValues(v -> v == null ? null : 
>> v.getResolvedDestination().toString())
>>                .to(Serdes.String(), Serdes.String(), DEST_INDEX);
>> 
>>        builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, 
>> DEST_INDEX);
>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>>>> Hi everyone, another question for the list :)
>>>> 
>>>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>>>> input stream K,V.
>>>> 
>>>> It has a number of secondary indices (think like a RDBMS)
>>>> K1 -> K
>>>> K2 -> K
>>>> etc
>>>> 
>>>> These are all based off of trivial mappings from my main stream that also
>>>> feeds the K -> V StateStore.  Think one liners like v -> 
>>>> v.getSecondaryKey1()
>>>> 
>>>> Currently, for each one it seems that I have to do
>>>> 
>>>> builder.stream(mainTopic)
>>>>      .mapValues(...)
>>>>      .to(secondaryIndex1)
>>>> 
>>>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>>>> 
>>>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>>>> low value.  In a case where my state stores are lost, I already have to
>>>> read through the mainTopic to recover the main state store.  While it's 
>>>> doing
>>>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>>>> directly.  Then I could skip having this index in Kafka at all, it's 
>>>> entirely
>>>> redundant.  The data is already loaded and deserialized for the benefit of
>>>> another Processor.
>>>> 
>>>> Any thoughts?  Happy Friday,
>>>> Steven
>>>> 
>>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to