I am new to this group and I found this subject interesting. Sounds like you guys want to implement a join table of two streams? Is there somewhere I can see the original requirement or proposal?
> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > > > On 05.09.2018 22:17, Adam Bellemare wrote: >> I'm currently testing using a Windowed Store to store the highwater mark. >> By all indications this should work fine, with the caveat being that it can >> only resolve out-of-order arrival for up to the size of the window (ie: >> 24h, 72h, etc). This would remove the possibility of it being unbounded in >> size. >> >> With regards to Jan's suggestion, I believe this is where we will have to >> remain in disagreement. While I do not disagree with your statement about >> there likely to be additional joins done in a real-world workflow, I do not >> see how you can conclusively deal with out-of-order arrival of foreign-key >> changes and subsequent joins. I have attempted what I think you have >> proposed (without a high-water, using groupBy and reduce) and found that if >> the foreign key changes too quickly, or the load on a stream thread is too >> high, the joined messages will arrive out-of-order and be incorrectly >> propagated, such that an intermediate event is represented as the final >> event. > Can you shed some light on your groupBy implementation. There must be some > sort of flaw in it. > I have a suspicion where it is, I would just like to confirm. The idea is > bullet proof and it must be > an implementation mess up. I would like to clarify before we draw a > conclusion. > >> Repartitioning the scattered events back to their original >> partitions is the only way I know how to conclusively deal with >> out-of-order events in a given time frame, and to ensure that the data is >> eventually consistent with the input events. >> >> If you have some code to share that illustrates your approach, I would be >> very grateful as it would remove any misunderstandings that I may have. > > ah okay you were looking for my code. I don't have something easily readable > here as its bloated with OO-patterns. > > its anyhow trivial: > > @Override > public T apply(K aggKey, V value, T aggregate) > { > Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary > U toModifyKey = mapper.apply(value); > << this is the place where people actually gonna have issues and > why you probably couldn't do it. we would need to find a solution here. I > didn't realize that yet. > << we propagate the field in the joiner, so that we can pick it up > in an aggregate. Probably you have not thought of this in your approach right? > << I am very open to find a generic solution here. In my honest > opinion this is broken in KTableImpl.GroupBy that it looses the keys and only > maintains the aggregate key. > << I abstracted it away back then way before i was thinking of > oneToMany join. That is why I didn't realize its significance here. > << Opinions? > > for (V m : current) > { > currentStateAsMap.put(mapper.apply(m), m); > } > if (isAdder) > { > currentStateAsMap.put(toModifyKey, value); > } > else > { > currentStateAsMap.remove(toModifyKey); > if(currentStateAsMap.isEmpty()){ > return null; > } > } > retrun asAggregateType(currentStateAsMap) > } > > > > > >> >> Thanks, >> >> Adam >> >> >> >> >> >> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >>> Thanks Adam for bringing Matthias to speed! >>> >>> about the differences. I think re-keying back should be optional at best. >>> I would say we return a KScatteredTable with reshuffle() returning >>> KTable<originalKey,Joined> to make the backwards repartitioning optional. >>> I am also in a big favour of doing the out of order processing using group >>> by instead high water mark tracking. >>> Just because unbounded growth is just scary + It saves us the header stuff. >>> >>> I think the abstraction of always repartitioning back is just not so >>> strong. Like the work has been done before we partition back and grouping >>> by something else afterwards is really common. >>> >>> >>> >>> >>> >>> >>> On 05.09.2018 13:49, Adam Bellemare wrote: >>> >>>> Hi Matthias >>>> >>>> Thank you for your feedback, I do appreciate it! >>>> >>>> While name spacing would be possible, it would require to deserialize >>>>> user headers what implies a runtime overhead. I would suggest to no >>>>> namespace for now to avoid the overhead. If this becomes a problem in >>>>> the future, we can still add name spacing later on. >>>>> >>>> Agreed. I will go with using a reserved string and document it. >>>> >>>> >>>> >>>> My main concern about the design it the type of the result KTable: If I >>>> understood the proposal correctly, >>>> >>>> >>>> In your example, you have table1 and table2 swapped. Here is how it works >>>> currently: >>>> >>>> 1) table1 has the records that contain the foreign key within their value. >>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)> >>>> table2 input stream: <A,X>, <B,Y> >>>> >>>> 2) A Value mapper is required to extract the foreign key. >>>> table1 foreign key mapper: ( value => value.fk ) >>>> >>>> The mapper is applied to each element in table1, and a new combined key is >>>> made: >>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, >>>> (fk=B,bar=3)> >>>> >>>> 3) The rekeyed events are copartitioned with table2: >>>> >>>> a) Stream Thread with Partition 0: >>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)> >>>> Table2: <A,X> >>>> >>>> b) Stream Thread with Partition 1: >>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> >>>> Table2: <B,Y> >>>> >>>> 4) From here, they can be joined together locally by applying the joiner >>>> function. >>>> >>>> >>>> >>>> At this point, Jan's design and my design deviate. My design goes on to >>>> repartition the data post-join and resolve out-of-order arrival of >>>> records, >>>> finally returning the data keyed just the original key. I do not expose >>>> the >>>> CombinedKey or any of the internals outside of the joinOnForeignKey >>>> function. This does make for larger footprint, but it removes all agency >>>> for resolving out-of-order arrivals and handling CombinedKeys from the >>>> user. I believe that this makes the function much easier to use. >>>> >>>> Let me know if this helps resolve your questions, and please feel free to >>>> add anything else on your mind. >>>> >>>> Thanks again, >>>> Adam >>>> >>>> >>>> >>>> >>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>> Hi, >>>>> I am just catching up on this thread. I did not read everything so far, >>>>> but want to share couple of initial thoughts: >>>>> >>>>> >>>>> >>>>> Headers: I think there is a fundamental difference between header usage >>>>> in this KIP and KP-258. For 258, we add headers to changelog topic that >>>>> are owned by Kafka Streams and nobody else is supposed to write into >>>>> them. In fact, no user header are written into the changelog topic and >>>>> thus, there are not conflicts. >>>>> >>>>> Nevertheless, I don't see a big issue with using headers within Streams. >>>>> As long as we document it, we can have some "reserved" header keys and >>>>> users are not allowed to use when processing data with Kafka Streams. >>>>> IMHO, this should be ok. >>>>> >>>>> I think there is a safe way to avoid conflicts, since these headers are >>>>>> only needed in internal topics (I think): >>>>>> For internal and changelog topics, we can namespace all headers: >>>>>> * user-defined headers are namespaced as "external." + headerKey >>>>>> * internal headers are namespaced as "internal." + headerKey >>>>>> >>>>> While name spacing would be possible, it would require to deserialize >>>>> user headers what implies a runtime overhead. I would suggest to no >>>>> namespace for now to avoid the overhead. If this becomes a problem in >>>>> the future, we can still add name spacing later on. >>>>> >>>>> >>>>> >>>>> My main concern about the design it the type of the result KTable: If I >>>>> understood the proposal correctly, >>>>> >>>>> KTable<K1,V1> table1 = ... >>>>> KTable<K2,V2> table2 = ... >>>>> >>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); >>>>> >>>>> implies that the `joinedTable` has the same key as the left input table. >>>>> IMHO, this does not work because if table2 contains multiple rows that >>>>> join with a record in table1 (what is the main purpose of a foreign key >>>>> join), the result table would only contain a single join result, but not >>>>> multiple. >>>>> >>>>> Example: >>>>> >>>>> table1 input stream: <A,X> >>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> >>>>> >>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If the >>>>> result key is the same key as key of table1, this implies that the >>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both. >>>>> Because the share the same key, whatever result record we emit later, >>>>> overwrite the previous result. >>>>> >>>>> This is the reason why Jan originally proposed to use a combination of >>>>> both primary keys of the input tables as key of the output table. This >>>>> makes the keys of the output table unique and we can store both in the >>>>> output table: >>>>> >>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> >>>>> >>>>> >>>>> Thoughts? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> >>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >>>>> >>>>>> Just on remark here. >>>>>> The high-watermark could be disregarded. The decision about the forward >>>>>> depends on the size of the aggregated map. >>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element maps >>>>>> would be published as delete. Any other count >>>>>> of map entries is in "waiting for correct deletes to arrive"-state. >>>>>> >>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: >>>>>> >>>>>>> It does look like I could replace the second repartition store and >>>>>>> highwater store with a groupBy and reduce. However, it looks like I >>>>>>> would >>>>>>> still need to store the highwater value within the materialized store, >>>>>>> >>>>>> to >>>>>> compare the arrival of out-of-order records (assuming my understanding >>>>>> of >>>>>> THIS is correct...). This in effect is the same as the design I have >>>>>> now, >>>>>> just with the two tables merged together. >