Thank you for your reply.

I have attached my first attempt at writing a KIP and I was wondering if
you could review it and share your thoughts.

Going forward I would like to create this KIP. I was wondering whom I
should ask to get the necessary permissions on the wiki. Username:
winkelman.kyle



On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Kyle,
>
> Sorry for the delay in replying. I think it's worth doing a KIP for this
> one. One super helpful thing with KIPs is to list a few more scenarios that
> would benefit from this approach. In particular it seems the main benefit
> is from reducing the number of state stores. Does this necessarily reduce
> the number of IOs to the stores (number of puts/gets), or the extra space
> overheads with multiple stores. Quantifying that a bit would help.
>
> To answer your original questions:
>
> >The problem I am having with this approach is understanding if there is a
> race condition. Obviously the source topics would be copartitioned. But
> would it be multithreaded and possibly cause one of the processors to grab
> patient 1 at the same time a different processor has grabbed patient 1?
>
>
> I don't think there will be a problem here. A processor cannot be accessed
> by multiple threads in Kafka Streams.
>
>
> >My understanding is that for each partition there would be a single
> complete set of processors and a new incoming record would go completely
> through the processor topology from a source node to a sink node before the
> next one is sent through. Is this correct?
>
> This is mostly true, however if caching is enabled (for dedupping, see
> KIP-63), then a record may reside in a cache before going to the sink.
> Meanwhile another record can come in. So multiple records can be in the
> topology at the same time.
>
> Thanks
> Eno
>
>
>
>
>
> On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
>
>> Eno,
>> Thanks for the response. The figure was just a restatement of my
>> questions. I have made an attempt at a low level processor and it appears
>> to work but it isn't very pretty and was hoping for something at the
>> streams api level.
>>
>> I have written some code to show an example of how I see the Cogroup
>> working in kafka.
>>
>> First the KGroupedStream would have a cogroup method that takes the
>> initializer and the aggregator for that specific KGroupedStream. This would
>> return a KCogroupedStream that has 2 methods one to add more
>> KGroupedStream, Aggregator pairs and one to complete the construction and
>> return a KTable.
>>
>> builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
>> aggValueSerde, storeName).cogroup(groupedStream1,
>> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();
>>
>> Behind the scenes we create a KStreamAggregate for each KGroupedStream,
>> Aggregator pair. Then a final pass through processor to pass on the
>> aggregate values. This gives us a KTable backed by a single store that is
>> used in all of the processors.
>>
>> Please let me know if this is something you think would add value to
>> kafka streams. And I will try to create a KIP to foster more communication.
>>
>> You can take a look at what I have. I think it's missing a fair amount
>> but it's a good start. I took the doAggregate method in KGroupedStream as
>> my starting point and expanded on it for multiple streams:
>> https://github.com/KyleWinkelman/kafka/tree/cogroup
>>
>>
>
KIP-? - Kafka-Streams Cogroup


Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Released: <Kafka Version>
Please keep the discussion on the mailing list rather than commenting on the 
wiki (wiki discussions get unwieldy fast).


Motivation
When multiple streams aggregate together to form a single large object (eg. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer.), it is very difficult to accomodate 
this in the Kafka-Streams DSL. It generally requires you to group and aggregate 
all of the streams to KTables then make multiple outerjoin calls to end up with 
a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.
Creating a cogroup method where you use a single state store will:
    1. Reduce the number of gets from state stores. With the multiple joins 
when a new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.
    2. Slight performance increase. As described above all ValueGetters are 
called also causing all ValueJoiners to be called forcing a recalculation of 
the current joined value of all other streams, impacting performance.


Public Interfaces
KGroupedStream { //Possibly add support for Windows and Sessions as well.
    ...
    <T> KCogroupedStream<K, T> cogroup(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, 
T> aggregator,
                                       final Serde<T> aggValueSerde,
                                       final String storeName);
                                       
    <T> KCogroupedStream<K, T> cogroup(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, 
T> aggregator,
                                       final StateStoreSupplier<?> 
storeSupplier);
}

public interface KCogroupedStream<K, V> {

    <T> KCogroupedStream<K, V> cogroup(KGroupedStream<K, T> groupedStream, 
Aggregator<? super K, ? super T, V> aggregator);

    KTable<K, V> aggregate();
}

Expected use:
KTable<K, T> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, 
aggValueSerde, "aggValue")
        .cogroup(groupedStream2, aggregator2)
        .cogroup(groupedStream3, aggregator3)
        ...
        .cogroup(groupedStreamN, aggregatorN)
        .aggregate();

Proposed Changes
1. Construct the above Public Interfaces.
2. Create an internal.KCogroupedStreamImpl that will keep track of the 
StateStoreSupplier, Initializer, and Pairs of (KGroupedStream, Aggregator).
3. Model the aggregate method of internal.KCogroupedStream after the 
doAggregate method of KGroupedStream by forcing the KGroupedStreams to 
repartitionIfRequired and adding the KStreamAggregate Processor for each 
KGroupedStream. Additionally add a KStreamCogroup processor and ensure all 
sources are copartitioned and processors have access to the state store.
4. Create a KStreamCogroup that will passthrough all outputs from the 
KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it 
will keep track of all of its parents KStreamAggregates in case it needs to 
enableSendingOldValues and it can have one of them create a 
KTableValueGetterSupplier if view is called.


Compatibility, Deprecation, and Migration Plan
Users must upgrade to new version if they want to use this functionality.


Test Plan
Integration Test similar to KStreamAggregationIntegrationTest


Rejected Alternatives

Reply via email to