I am new to this group and I found this subject interesting.  Sounds like you 
guys want to implement a join table of two streams? Is there somewhere I can 
see the original requirement or proposal?   

> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com> wrote:
> 
> 
> On 05.09.2018 22:17, Adam Bellemare wrote:
>> I'm currently testing using a Windowed Store to store the highwater mark.
>> By all indications this should work fine, with the caveat being that it can
>> only resolve out-of-order arrival for up to the size of the window (ie:
>> 24h, 72h, etc). This would remove the possibility of it being unbounded in
>> size.
>> 
>> With regards to Jan's suggestion, I believe this is where we will have to
>> remain in disagreement. While I do not disagree with your statement about
>> there likely to be additional joins done in a real-world workflow, I do not
>> see how you can conclusively deal with out-of-order arrival of foreign-key
>> changes and subsequent joins. I have attempted what I think you have
>> proposed (without a high-water, using groupBy and reduce) and found that if
>> the foreign key changes too quickly, or the load on a stream thread is too
>> high, the joined messages will arrive out-of-order and be incorrectly
>> propagated, such that an intermediate event is represented as the final
>> event.
> Can you shed some light on your groupBy implementation. There must be some 
> sort of flaw in it.
> I have a suspicion where it is, I would just like to confirm. The idea is 
> bullet proof and it must be
> an implementation mess up. I would like to clarify before we draw a 
> conclusion.
> 
>>  Repartitioning the scattered events back to their original
>> partitions is the only way I know how to conclusively deal with
>> out-of-order events in a given time frame, and to ensure that the data is
>> eventually consistent with the input events.
>> 
>> If you have some code to share that illustrates your approach, I would be
>> very grateful as it would remove any misunderstandings that I may have.
> 
> ah okay you were looking for my code. I don't have something easily readable 
> here as its bloated with OO-patterns.
> 
> its anyhow trivial:
> 
> @Override
>    public T apply(K aggKey, V value, T aggregate)
>    {
>        Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary
>        U toModifyKey = mapper.apply(value);
>            << this is the place where people actually gonna have issues and 
> why you probably couldn't do it. we would need to find a solution here. I 
> didn't realize that yet.
>            << we propagate the field in the joiner, so that we can pick it up 
> in an aggregate. Probably you have not thought of this in your approach right?
>            << I am very open to find a generic solution here. In my honest 
> opinion this is broken in KTableImpl.GroupBy that it looses the keys and only 
> maintains the aggregate key.
>            << I abstracted it away back then way before i was thinking of 
> oneToMany join. That is why I didn't realize its significance here.
>            << Opinions?
> 
>        for (V m : current)
>        {
>            currentStateAsMap.put(mapper.apply(m), m);
>        }
>        if (isAdder)
>        {
>            currentStateAsMap.put(toModifyKey, value);
>        }
>        else
>        {
>            currentStateAsMap.remove(toModifyKey);
>            if(currentStateAsMap.isEmpty()){
>                return null;
>            }
>        }
>        retrun asAggregateType(currentStateAsMap)
>    }
> 
> 
> 
> 
> 
>> 
>> Thanks,
>> 
>> Adam
>> 
>> 
>> 
>> 
>> 
>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>> 
>>> Thanks Adam for bringing Matthias to speed!
>>> 
>>> about the differences. I think re-keying back should be optional at best.
>>> I would say we return a KScatteredTable with reshuffle() returning
>>> KTable<originalKey,Joined> to make the backwards repartitioning optional.
>>> I am also in a big favour of doing the out of order processing using group
>>> by instead high water mark tracking.
>>> Just because unbounded growth is just scary + It saves us the header stuff.
>>> 
>>> I think the abstraction of always repartitioning back is just not so
>>> strong. Like the work has been done before we partition back and grouping
>>> by something else afterwards is really common.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On 05.09.2018 13:49, Adam Bellemare wrote:
>>> 
>>>> Hi Matthias
>>>> 
>>>> Thank you for your feedback, I do appreciate it!
>>>> 
>>>> While name spacing would be possible, it would require to deserialize
>>>>> user headers what implies a runtime overhead. I would suggest to no
>>>>> namespace for now to avoid the overhead. If this becomes a problem in
>>>>> the future, we can still add name spacing later on.
>>>>> 
>>>> Agreed. I will go with using a reserved string and document it.
>>>> 
>>>> 
>>>> 
>>>> My main concern about the design it the type of the result KTable: If I
>>>> understood the proposal correctly,
>>>> 
>>>> 
>>>> In your example, you have table1 and table2 swapped. Here is how it works
>>>> currently:
>>>> 
>>>> 1) table1 has the records that contain the foreign key within their value.
>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)>
>>>> table2 input stream: <A,X>, <B,Y>
>>>> 
>>>> 2) A Value mapper is required to extract the foreign key.
>>>> table1 foreign key mapper: ( value => value.fk )
>>>> 
>>>> The mapper is applied to each element in table1, and a new combined key is
>>>> made:
>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
>>>> (fk=B,bar=3)>
>>>> 
>>>> 3) The rekeyed events are copartitioned with table2:
>>>> 
>>>> a) Stream Thread with Partition 0:
>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
>>>> Table2: <A,X>
>>>> 
>>>> b) Stream Thread with Partition 1:
>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>>>> Table2: <B,Y>
>>>> 
>>>> 4) From here, they can be joined together locally by applying the joiner
>>>> function.
>>>> 
>>>> 
>>>> 
>>>> At this point, Jan's design and my design deviate. My design goes on to
>>>> repartition the data post-join and resolve out-of-order arrival of
>>>> records,
>>>> finally returning the data keyed just the original key. I do not expose
>>>> the
>>>> CombinedKey or any of the internals outside of the joinOnForeignKey
>>>> function. This does make for larger footprint, but it removes all agency
>>>> for resolving out-of-order arrivals and handling CombinedKeys from the
>>>> user. I believe that this makes the function much easier to use.
>>>> 
>>>> Let me know if this helps resolve your questions, and please feel free to
>>>> add anything else on your mind.
>>>> 
>>>> Thanks again,
>>>> Adam
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>> 
>>>> Hi,
>>>>> I am just catching up on this thread. I did not read everything so far,
>>>>> but want to share couple of initial thoughts:
>>>>> 
>>>>> 
>>>>> 
>>>>> Headers: I think there is a fundamental difference between header usage
>>>>> in this KIP and KP-258. For 258, we add headers to changelog topic that
>>>>> are owned by Kafka Streams and nobody else is supposed to write into
>>>>> them. In fact, no user header are written into the changelog topic and
>>>>> thus, there are not conflicts.
>>>>> 
>>>>> Nevertheless, I don't see a big issue with using headers within Streams.
>>>>> As long as we document it, we can have some "reserved" header keys and
>>>>> users are not allowed to use when processing data with Kafka Streams.
>>>>> IMHO, this should be ok.
>>>>> 
>>>>> I think there is a safe way to avoid conflicts, since these headers are
>>>>>> only needed in internal topics (I think):
>>>>>> For internal and changelog topics, we can namespace all headers:
>>>>>> * user-defined headers are namespaced as "external." + headerKey
>>>>>> * internal headers are namespaced as "internal." + headerKey
>>>>>> 
>>>>> While name spacing would be possible, it would require to deserialize
>>>>> user headers what implies a runtime overhead. I would suggest to no
>>>>> namespace for now to avoid the overhead. If this becomes a problem in
>>>>> the future, we can still add name spacing later on.
>>>>> 
>>>>> 
>>>>> 
>>>>> My main concern about the design it the type of the result KTable: If I
>>>>> understood the proposal correctly,
>>>>> 
>>>>> KTable<K1,V1> table1 = ...
>>>>> KTable<K2,V2> table2 = ...
>>>>> 
>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
>>>>> 
>>>>> implies that the `joinedTable` has the same key as the left input table.
>>>>> IMHO, this does not work because if table2 contains multiple rows that
>>>>> join with a record in table1 (what is the main purpose of a foreign key
>>>>> join), the result table would only contain a single join result, but not
>>>>> multiple.
>>>>> 
>>>>> Example:
>>>>> 
>>>>> table1 input stream: <A,X>
>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>>>>> 
>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If the
>>>>> result key is the same key as key of table1, this implies that the
>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
>>>>> Because the share the same key, whatever result record we emit later,
>>>>> overwrite the previous result.
>>>>> 
>>>>> This is the reason why Jan originally proposed to use a combination of
>>>>> both primary keys of the input tables as key of the output table. This
>>>>> makes the keys of the output table unique and we can store both in the
>>>>> output table:
>>>>> 
>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
>>>>> 
>>>>> 
>>>>> Thoughts?
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>>>>> 
>>>>>> Just on remark here.
>>>>>> The high-watermark could be disregarded. The decision about the forward
>>>>>> depends on the size of the aggregated map.
>>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element maps
>>>>>> would be published as delete. Any other count
>>>>>> of map entries is in "waiting for correct deletes to arrive"-state.
>>>>>> 
>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
>>>>>> 
>>>>>>> It does look like I could replace the second repartition store and
>>>>>>> highwater store with a groupBy and reduce.  However, it looks like I
>>>>>>> would
>>>>>>> still need to store the highwater value within the materialized store,
>>>>>>> 
>>>>>> to
>>>>>> compare the arrival of out-of-order records (assuming my understanding
>>>>>> of
>>>>>> THIS is correct...). This in effect is the same as the design I have
>>>>>> now,
>>>>>> just with the two tables merged together.
> 

Reply via email to