Makes sense now (considering the explanation form the other thread).

With regard to an "opt-in" optimization. We could simplify the API and
hide some details, but it wouldn't buy you anything from an execution
point of view. As you need all data on each instance, you need to
somehow "broadcast" the information -- and Kafka Streams applications
use topics to exchange data. Thus, we need a topic anyhow.

Does this make sense?

So your overall architecture seems to be sound to me.


-Matthias


On 6/2/17 2:37 PM, Steven Schlansker wrote:
> 
>> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> Hi,
>>
>> If you want to populate a GlobalKTable you can only do this by reading a
>> topic. So the short answer for you head line is: no, you can suppress
>> the intermediate topic.
> 
> Bummer!  Maybe this is an opt-in optimization to consider later.
> 
>>
>> However, I am wondering what the purpose of you secondary index is, and
>> why you are using a GlobalKTable for it. Maybe you can elaborate a
>> little bit?
> 
> Elaborated on this a bit in the other thread, I was trying to keep separate
> problems separate, but maybe I just made things more confusing!
> 
> tl;dr is that the user requests values knowing K, but there is actually a
> "hidden composite key" D that is relevant to the partitioning strategy.
> 
> The GlobalKTable allows you to look up K -> D, and then find the right local 
> KTable K,D -> V
> 
>>
>> I am also wondering about this code snippet:
>>
>>>> builder.stream(mainTopic)
>>>>       .mapValues(...)
>>>>       .to(secondaryIndex1)
>>
>> Should it not be .map() that transforms (k,v) ->
>> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
>>
> 
> In this case, the "externally visible" K needs additional information about
> the destination D so that it can be partitioned correctly.  So the code looks
> like:
> 
>         // TODO: sucks that this materializes an intermediate topic
>         msgStream
>                 .mapValues(v -> v == null ? null : 
> v.getResolvedDestination().toString())
>                 .to(Serdes.String(), Serdes.String(), DEST_INDEX);
> 
>         builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, 
> DEST_INDEX);
> 
>>
>> -Matthias
>>
>>
>> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>>> Hi everyone, another question for the list :)
>>>
>>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>>> input stream K,V.
>>>
>>> It has a number of secondary indices (think like a RDBMS)
>>> K1 -> K
>>> K2 -> K
>>> etc
>>>
>>> These are all based off of trivial mappings from my main stream that also
>>> feeds the K -> V StateStore.  Think one liners like v -> 
>>> v.getSecondaryKey1()
>>>
>>> Currently, for each one it seems that I have to do
>>>
>>> builder.stream(mainTopic)
>>>       .mapValues(...)
>>>       .to(secondaryIndex1)
>>>
>>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>>>
>>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>>> low value.  In a case where my state stores are lost, I already have to
>>> read through the mainTopic to recover the main state store.  While it's 
>>> doing
>>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>>> directly.  Then I could skip having this index in Kafka at all, it's 
>>> entirely
>>> redundant.  The data is already loaded and deserialized for the benefit of
>>> another Processor.
>>>
>>> Any thoughts?  Happy Friday,
>>> Steven
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to