Hi All

Sorry for the delay - holidays and all. I have since updated the KIP with
John's original suggestion and have pruned a number of the no longer
relevant diagrams. Any more comments would be welcomed, otherwise I will
look to kick off the vote again shortly.

Thanks
Adam

On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi John and Guozhang
>
> Ah yes, I lost that in the mix! Thanks for the convergent solutions - I do
> think that the attachment that John included makes for a better design. It
> should also help with overall performance as very high-cardinality foreign
> keyed data (say millions of events with the same entity) will be able to
> leverage the multiple nodes for join functionality instead of having it all
> performed in one node. There is still a bottleneck in the right table
> having to propagate all those events, but with slimmer structures, less IO
> and no need to perform the join I think the throughput will be much higher
> in those scenarios.
>
> Okay, I am convinced. I will update the KIP accordingly to a Gliffy
> version of John's diagram and ensure that the example flow matches
> correctly. Then I can go back to working on the PR to match the diagram.
>
> Thanks both of you for all the help - very much appreciated.
>
> Adam
>
>
>
>
>
>
>
> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi John,
>>
>> Just made a pass on your diagram (nice hand-drawing btw!), and obviously
>> we
>> are thinking about the same thing :) A neat difference that I like, is
>> that
>> in the pre-join repartition topic we can still send message in the format
>> of `K=k, V=(i=2)` while using "i" as the partition key in
>> StreamsPartition,
>> this way we do not need to even augment the key for the repartition topic,
>> but just do a projection on the foreign key part but trim all other
>> fields:
>> as long as we still materialize the store as `A-2` co-located with the
>> right KTable, that is fine.
>>
>> As I mentioned in my previous email, I also think this has a few
>> advantages
>> on saving over-the-wire bytes as well as disk bytes.
>>
>> Guozhang
>>
>>
>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io> wrote:
>>
>> > Hi Guozhang,
>> >
>> > Thanks for taking a look! I think Adam's already addressed your
>> questions
>> > as well as I could have.
>> >
>> > Hi Adam,
>> >
>> > Thanks for updating the KIP. It looks great, especially how all the
>> > need-to-know information is right at the top, followed by the details.
>> >
>> > Also, thanks for that high-level diagram. Actually, now that I'm looking
>> > at it, I think part of my proposal got lost in translation, although I
>> do
>> > think that what you have there is also correct.
>> >
>> > I sketched up a crude diagram based on yours and attached it to the KIP
>> > (I'm not sure if attached or inline images work on the mailing list):
>> >
>> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
>> > . It's also attached to this email for convenience.
>> >
>> > Hopefully, you can see how it's intended to line up, and which parts are
>> > modified.
>> > At a high level, instead of performing the join on the right-hand side,
>> > we're essentially just registering interest, like "LHS key A wishes to
>> > receive updates for RHS key 2". Then, when there is a new "interest" or
>> any
>> > updates to the RHS records, it "broadcasts" its state back to the LHS
>> > records who are interested in it.
>> >
>> > Thus, instead of sending the LHS values to the RHS joiner workers and
>> then
>> > sending the join results back to the LHS worke be co-partitioned and
>> > validated, we instead only send the LHS *keys* to the RHS workers and
>> then
>> > only the RHS k/v back to be joined by the LHS worker.
>> >
>> > I've been considering both your diagram and mine, and I *think* what I'm
>> > proposing has a few advantages.
>> >
>> > Here are some points of interest as you look at the diagram:
>> > * When we extract the foreign key and send it to the Pre-Join
>> Repartition
>> > Topic, we can send only the FK/PK pair. There's no need to worry about
>> > custom partitioner logic, since we can just use the foreign key plainly
>> as
>> > the repartition record key. Also, we save on transmitting the LHS value,
>> > since we only send its key in this step.
>> > * We also only need to store the RHSKey:LHSKey mapping in the
>> > MaterializedSubscriptionStore, saving on disk. We can use the same rocks
>> > key format you proposed and the same algorithm involving range scans
>> when
>> > the RHS records get updated.
>> > * Instead of joining on the right side, all we do is compose a
>> > re-repartition record so we can broadcast the RHS k/v pair back to the
>> > original LHS partition. (this is what the "rekey" node is doing)
>> > * Then, there is a special kind of Joiner that's co-resident in the same
>> > StreamTask as the LHS table, subscribed to the Post-Join Repartition
>> Topic.
>> > ** This Joiner is *not* triggered directly by any changes in the LHS
>> > KTable. Instead, LHS events indirectly trigger the join via the whole
>> > lifecycle.
>> > ** For each event arriving from the Post-Join Repartition Topic, the
>> > Joiner looks up the corresponding record in the LHS KTable. It validates
>> > the FK as you noted, discarding any inconsistent events. Otherwise, it
>> > unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the join
>> > result
>> > ** Note that the Joiner itself is stateless, so materializing the join
>> > result is optional, just as with the 1:1 joins.
>> >
>> > So in summary:
>> > * instead of transmitting the LHS keys and values to the right and the
>> > JoinResult back to the left, we only transmit the LHS keys to the right
>> and
>> > the RHS values to the left. Assuming the average RHS value is on smaller
>> > than or equal to the average join result size, it's a clear win on
>> broker
>> > traffic. I think this is actually a reasonable assumption, which we can
>> > discuss more if you're suspicious.
>> > * we only need one copy of the data (the left and right tables need to
>> be
>> > materialized) and one extra copy of the PK:FK pairs in the Materialized
>> > Subscription Store. Materializing the join result is optional, just as
>> with
>> > the existing 1:1 joins.
>> > * we still need the fancy range-scan algorithm on the right to locate
>> all
>> > interested LHS keys when a RHS value is updated, but we don't need a
>> custom
>> > partitioner for either repartition topic (this is of course a
>> modification
>> > we could make to your version as well)
>> >
>> > How does this sound to you? (And did I miss anything?)
>> > -John
>> >
>> > On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <
>> adam.bellem...@gmail.com>
>> > wrote:
>> >
>> >> Hi John & Guozhang
>> >>
>> >> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the
>> KIP,
>> >> pruned much of what I wrote and put a simplified diagram near the top
>> to
>> >> illustrate the workflow. I encapsulated Jan's content at the bottom of
>> the
>> >> document. I believe it is simpler to read by far now.
>> >>
>> >> @Guozhang Wang <wangg...@gmail.com>:
>> >> > #1: rekey left table
>> >> >   -> source from the left upstream, send to rekey-processor to
>> generate
>> >> combined key, and then sink to copartition topic.
>> >> Correct.
>> >>
>> >> > #2: first-join with right table
>> >> >   -> source from the right table upstream, materialize the right
>> table.
>> >> >   -> source from the co-partition topic, materialize the rekeyed left
>> >> table, join with the right table, rekey back, and then sink to the
>> >> rekeyed-back topic.
>> >> Almost - I cleared up the KIP. We do not rekey back yet, as I need the
>> >> Foreign-Key value generated in #1 above to compare in the resolution
>> >> stage.
>> >>
>> >> > #3: second join
>> >> >    -> source from the rekeyed-back topic, materialize the rekeyed
>> back
>> >> table.
>> >> >   -> source from the left upstream, materialize the left table, join
>> >> with
>> >> the rekeyed back table.
>> >> Almost - As each event comes in, we just run it through a stateful
>> >> processor that checks the original ("This") KTable for the key. The
>> value
>> >> payload then has the foreignKeyExtractor applied again as in Part #1
>> >> above,
>> >> and gets the current foreign key. Then we compare it to the joined
>> event
>> >> that we are currently resolving. If they have the same foreign-key,
>> >> propagate the result out. If they don't, throw the event away.
>> >>
>> >> The end result is that we do need to materialize 2 additional tables
>> >> (left/this-combinedkey table, and the final Joined table) as I've
>> >> illustrated in the updated KIP. I hope the diagram clears it up a lot
>> >> better. Please let me know.
>> >>
>> >> Thanks again
>> >> Adam
>> >>
>> >>
>> >>
>> >>
>> >> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >>
>> >> > John,
>> >> >
>> >> > Thanks a lot for the suggestions on refactoring the wiki, I agree
>> with
>> >> you
>> >> > that we should consider the KIP proposal to be easily understood by
>> >> anyone
>> >> > in the future to read, and hence should provide a good summary on the
>> >> > user-facing interfaces, as well as rejected alternatives to represent
>> >> > briefly "how we came a long way to this conclusion, and what we have
>> >> > argued, disagreed, and agreed about, etc" so that readers do not
>> need to
>> >> > dig into the DISCUSS thread to get all the details. We can, of
>> course,
>> >> keep
>> >> > the implementation details like "workflows" on the wiki page as a
>> >> addendum
>> >> > section since it also has correlations.
>> >> >
>> >> > Regarding your proposal on comment 6): that's a very interesting
>> idea!
>> >> Just
>> >> > to clarify that I understands it fully correctly: the proposal's
>> >> resulted
>> >> > topology is still the same as the current proposal, where we will
>> have 3
>> >> > sub-topologies for this operator:
>> >> >
>> >> > #1: rekey left table
>> >> >    -> source from the left upstream, send to rekey-processor to
>> generate
>> >> > combined key, and then sink to copartition topic.
>> >> >
>> >> > #2: first-join with right table
>> >> >    -> source from the right table upstream, materialize the right
>> table.
>> >> >    -> source from the co-partition topic, materialize the rekeyed
>> left
>> >> > table, join with the right table, rekey back, and then sink to the
>> >> > rekeyed-back topic.
>> >> >
>> >> > #3: second join
>> >> >    -> source from the rekeyed-back topic, materialize the rekeyed
>> back
>> >> > table.
>> >> >    -> source from the left upstream, materialize the left table, join
>> >> with
>> >> > the rekeyed back table.
>> >> >
>> >> > Sub-topology #1 and #3 may be merged to a single sub-topology since
>> >> both of
>> >> > them read from the left table source stream. In this workflow, we
>> need
>> >> to
>> >> > materialize 4 tables (left table in #3, right table in #2, rekeyed
>> left
>> >> > table in #2, rekeyed-back table in #3), and 2 repartition topics
>> >> > (copartition topic, rekeyed-back topic).
>> >> >
>> >> > Compared with Adam's current proposal in the workflow overview, it
>> has
>> >> the
>> >> > same num.materialize tables (left table, rekeyed left table, right
>> >> table,
>> >> > out-of-ordering resolver table), and same num.internal topics (two).
>> The
>> >> > advantage is that on the copartition topic, we can save bandwidth by
>> not
>> >> > sending value, and in #2 the rekeyed left table is smaller since we
>> do
>> >> not
>> >> > have any values to materialize. Is that right?
>> >> >
>> >> >
>> >> > Guozhang
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io>
>> wrote:
>> >> >
>> >> > > Hi Adam,
>> >> > >
>> >> > > Given that the committers are all pretty busy right now, I think
>> that
>> >> it
>> >> > > would help if you were to refactor the KIP a little to reduce the
>> >> > workload
>> >> > > for reviewers.
>> >> > >
>> >> > > I'd recommend the following changes:
>> >> > > * relocate all internal details to a section at the end called
>> >> something
>> >> > > like "Implementation Notes" or something like that.
>> >> > > * rewrite the rest of the KIP to be a succinct as possible and
>> mention
>> >> > only
>> >> > > publicly-facing API changes.
>> >> > > ** for example, the interface that you've already listed there, as
>> >> well
>> >> > as
>> >> > > a textual description of the guarantees we'll be providing (join
>> >> result
>> >> > is
>> >> > > copartitioned with the LHS, and the join result is guaranteed
>> correct)
>> >> > >
>> >> > > A good target would be that the whole main body of the KIP,
>> including
>> >> > > Status, Motivation, Proposal, Justification, and Rejected
>> Alternatives
>> >> > all
>> >> > > fit "above the fold" (i.e., all fit on the screen at a comfortable
>> >> zoom
>> >> > > level).
>> >> > > I think the only real Rejected Alternative that bears mention at
>> this
>> >> > point
>> >> > > is KScatteredTable, which you could just include the executive
>> >> summary on
>> >> > > (no implementation details), and link to extra details in the
>> >> > > Implementation Notes section.
>> >> > >
>> >> > > Taking a look at the wiki page, ~90% of the text there is internal
>> >> > detail,
>> >> > > which is useful for the dubious, but doesn't need to be ratified
>> in a
>> >> > vote
>> >> > > (and would be subject to change without notice in the future
>> anyway).
>> >> > > There's also a lot of conflicting discussion, as you've very
>> >> respectfully
>> >> > > tried to preserve the original proposal from Jan while adding your
>> >> own.
>> >> > > Isolating all this information in a dedicated section at the bottom
>> >> frees
>> >> > > the voters up to focus on the public API part of the proposal,
>> which
>> >> is
>> >> > > really all they need to consider.
>> >> > >
>> >> > > Plus, it'll be clear to future readers which parts of the document
>> are
>> >> > > enduring, and which parts are a snapshot of our implementation
>> >> thinking
>> >> > at
>> >> > > the time.
>> >> > >
>> >> > > I'm suggesting this because I suspect that the others haven't made
>> >> time
>> >> > to
>> >> > > review it partly because it seems daunting. If it seems like it
>> would
>> >> be
>> >> > a
>> >> > > huge time investment to review, people will just keep putting it
>> off.
>> >> But
>> >> > > if the KIP is a single page, then they'll be more inclined to give
>> it
>> >> a
>> >> > > read.
>> >> > >
>> >> > > Honestly, I don't think the KIP itself is that controversial (apart
>> >> from
>> >> > > the scattered table thing (sorry, Jan) ). Most of the discussion
>> has
>> >> been
>> >> > > around the implementation, which we can continue more effectively
>> in
>> >> a PR
>> >> > > once the KIP has passed.
>> >> > >
>> >> > > How does that sound?
>> >> > > -John
>> >> > >
>> >> > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
>> >> adam.bellem...@gmail.com
>> >> > >
>> >> > > wrote:
>> >> > >
>> >> > > > 1) I believe that the resolution mechanism John has proposed is
>> >> > > sufficient
>> >> > > > - it is clean and easy and doesn't require additional RocksDB
>> >> stores,
>> >> > > which
>> >> > > > reduces the footprint greatly. I don't think we need to resolve
>> >> based
>> >> > on
>> >> > > > timestamp or offset anymore, but if we decide to do to that
>> would be
>> >> > > within
>> >> > > > the bounds of the existing API.
>> >> > > >
>> >> > > > 2) Is the current API sufficient, or does it need to be altered
>> to
>> >> go
>> >> > > back
>> >> > > > to vote?
>> >> > > >
>> >> > > > 3) KScatteredTable implementation can always be added in a future
>> >> > > revision.
>> >> > > > This API does not rule it out. This implementation of this
>> function
>> >> > would
>> >> > > > simply be replaced with `KScatteredTable.resolve()` while still
>> >> > > maintaining
>> >> > > > the existing API, thereby giving both features as Jan outlined
>> >> earlier.
>> >> > > > Would this work?
>> >> > > >
>> >> > > >
>> >> > > > Thanks Guozhang, John and Jan
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io
>> >
>> >> > wrote:
>> >> > > >
>> >> > > > > Hi, all,
>> >> > > > >
>> >> > > > > >> In fact, we
>> >> > > > > >> can just keep a single final-result store with timestamps
>> and
>> >> > reject
>> >> > > > > values
>> >> > > > > >> that have a smaller timestamp, is that right?
>> >> > > > >
>> >> > > > > > Which is the correct output should at least be decided on the
>> >> > offset
>> >> > > of
>> >> > > > > > the original message.
>> >> > > > >
>> >> > > > > Thanks for this point, Jan.
>> >> > > > >
>> >> > > > > KIP-258 is merely to allow embedding the record timestamp  in
>> the
>> >> k/v
>> >> > > > > store,
>> >> > > > > as well as providing a storage-format upgrade path.
>> >> > > > >
>> >> > > > > I might have missed it, but I think we have yet to discuss
>> whether
>> >> > it's
>> >> > > > > safe
>> >> > > > > or desirable just to swap topic-ordering our for
>> >> timestamp-ordering.
>> >> > > This
>> >> > > > > is
>> >> > > > > a very deep topic, and I think it would only pollute the
>> current
>> >> > > > > discussion.
>> >> > > > >
>> >> > > > > What Adam has proposed is safe, given the *current* ordering
>> >> > semantics
>> >> > > > > of the system. If we can agree on his proposal, I think we can
>> >> merge
>> >> > > the
>> >> > > > > feature well before the conversation about timestamp ordering
>> even
>> >> > > takes
>> >> > > > > place, much less reaches a conclusion. In the mean time, it
>> would
>> >> > seem
>> >> > > to
>> >> > > > > be unfortunate to have one join operator with different
>> ordering
>> >> > > > semantics
>> >> > > > > from every other KTable operator.
>> >> > > > >
>> >> > > > > If and when that timestamp discussion takes place, many (all?)
>> >> KTable
>> >> > > > > operations
>> >> > > > > will need to be updated, rendering the many:one join a small
>> >> marginal
>> >> > > > cost.
>> >> > > > >
>> >> > > > > And, just to plug it again, I proposed an algorithm above that
>> I
>> >> > > believe
>> >> > > > > provides
>> >> > > > > correct ordering without any additional metadata, and
>> regardless
>> >> of
>> >> > the
>> >> > > > > ordering semantics. I didn't bring it up further, because I
>> felt
>> >> the
>> >> > > KIP
>> >> > > > > only needs
>> >> > > > > to agree on the public API, and we can discuss the
>> implementation
>> >> at
>> >> > > > > leisure in
>> >> > > > > a PR...
>> >> > > > >
>> >> > > > > Thanks,
>> >> > > > > -John
>> >> > > > >
>> >> > > > >
>> >> > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
>> >> > jan.filip...@trivago.com
>> >> > > >
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > On 10.12.2018 07:42, Guozhang Wang wrote:
>> >> > > > > > > Hello Adam / Jan / John,
>> >> > > > > > >
>> >> > > > > > > Sorry for being late on this thread! I've finally got some
>> >> time
>> >> > > this
>> >> > > > > > > weekend to cleanup a load of tasks on my queue (actually
>> I've
>> >> > also
>> >> > > > > > realized
>> >> > > > > > > there are a bunch of other things I need to enqueue while
>> >> > cleaning
>> >> > > > them
>> >> > > > > > up
>> >> > > > > > > --- sth I need to improve on my side). So here are my
>> >> thoughts:
>> >> > > > > > >
>> >> > > > > > > Regarding the APIs: I like the current written API in the
>> KIP.
>> >> > More
>> >> > > > > > > generally I'd prefer to keep the 1) one-to-many join
>> >> > > functionalities
>> >> > > > as
>> >> > > > > > > well as 2) other join types than inner as separate KIPs
>> since
>> >> 1)
>> >> > > may
>> >> > > > > > worth
>> >> > > > > > > a general API refactoring that can benefit not only
>> foreignkey
>> >> > > joins
>> >> > > > > but
>> >> > > > > > > collocate joins as well (e.g. an extended proposal of
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
>> >> > > > > > ),
>> >> > > > > > > and I'm not sure if other join types would actually be
>> needed
>> >> > > (maybe
>> >> > > > > left
>> >> > > > > > > join still makes sense), so it's better to
>> >> > > > > wait-for-people-to-ask-and-add
>> >> > > > > > > than add-sth-that-no-one-uses.
>> >> > > > > > >
>> >> > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing
>> a
>> >> > > > > > > KScatteredTable for users to inject their own optimization:
>> >> I'd
>> >> > > > prefer
>> >> > > > > to
>> >> > > > > > > do the current option as-is, and my main rationale is for
>> >> > > > optimization
>> >> > > > > > > rooms inside the Streams internals and the API
>> succinctness.
>> >> For
>> >> > > > > advanced
>> >> > > > > > > users who may indeed prefer KScatteredTable and do their
>> own
>> >> > > > > > optimization,
>> >> > > > > > > while it is too much of the work to use Processor API
>> >> directly, I
>> >> > > > think
>> >> > > > > > we
>> >> > > > > > > can still extend the current API to support it in the
>> future
>> >> if
>> >> > it
>> >> > > > > > becomes
>> >> > > > > > > necessary.
>> >> > > > > >
>> >> > > > > > no internal optimization potential. it's a myth
>> >> > > > > >
>> >> > > > > > ¯\_(ツ)_/¯
>> >> > > > > >
>> >> > > > > > :-)
>> >> > > > > >
>> >> > > > > > >
>> >> > > > > > > Another note about step 4) resolving out-of-ordering data,
>> as
>> >> I
>> >> > > > > mentioned
>> >> > > > > > > before I think with KIP-258 (embedded timestamp with
>> key-value
>> >> > > store)
>> >> > > > > we
>> >> > > > > > > can actually make this step simpler than the current
>> >> proposal. In
>> >> > > > fact,
>> >> > > > > > we
>> >> > > > > > > can just keep a single final-result store with timestamps
>> and
>> >> > > reject
>> >> > > > > > values
>> >> > > > > > > that have a smaller timestamp, is that right?
>> >> > > > > >
>> >> > > > > > Which is the correct output should at least be decided on the
>> >> > offset
>> >> > > of
>> >> > > > > > the original message.
>> >> > > > > >
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > That's all I have in mind now. Again, great appreciation to
>> >> Adam
>> >> > to
>> >> > > > > make
>> >> > > > > > > such HUGE progress on this KIP!
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > Guozhang
>> >> > > > > > >
>> >> > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
>> >> > > > jan.filip...@trivago.com>
>> >> > > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > >> If they don't find the time:
>> >> > > > > > >> They usually take the opposite path from me :D
>> >> > > > > > >> so the answer would be clear.
>> >> > > > > > >>
>> >> > > > > > >> hence my suggestion to vote.
>> >> > > > > > >>
>> >> > > > > > >>
>> >> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote:
>> >> > > > > > >>> Hi Guozhang and Matthias
>> >> > > > > > >>>
>> >> > > > > > >>> I know both of you are quite busy, but we've gotten this
>> KIP
>> >> > to a
>> >> > > > > point
>> >> > > > > > >>> where we need more guidance on the API (perhaps a bit of
>> a
>> >> > > > > tie-breaker,
>> >> > > > > > >> if
>> >> > > > > > >>> you will). If you have anyone else you may think should
>> >> look at
>> >> > > > this,
>> >> > > > > > >>> please tag them accordingly.
>> >> > > > > > >>>
>> >> > > > > > >>> The scenario is as such:
>> >> > > > > > >>>
>> >> > > > > > >>> Current Option:
>> >> > > > > > >>> API:
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
>> >> > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the
>> >> > > partition
>> >> > > > > with
>> >> > > > > > >> the
>> >> > > > > > >>> foreignKey (repartition 1)
>> >> > > > > > >>> 2) Join the data
>> >> > > > > > >>> 3) Shuffle the data back to the original node
>> (repartition
>> >> 2)
>> >> > > > > > >>> 4) Resolve out-of-order arrival / race condition due to
>> >> > > foreign-key
>> >> > > > > > >> changes.
>> >> > > > > > >>>
>> >> > > > > > >>> Alternate Option:
>> >> > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable.
>> >> > > > > > >>> - It would be keyed on a wrapped key function:
>> >> <CombinedKey<KO,
>> >> > > K>,
>> >> > > > > VR>
>> >> > > > > > >> (KO
>> >> > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined
>> Result)
>> >> > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but
>> >> > > otherwise a
>> >> > > > > > user
>> >> > > > > > >>> would be able to perform additional functions directly
>> from
>> >> the
>> >> > > > > > >>> KScatteredTable (TBD - currently out of scope).
>> >> > > > > > >>> - John's analysis 2-emails up is accurate as to the
>> >> tradeoffs.
>> >> > > > > > >>>
>> >> > > > > > >>> Current Option is coded as-is. Alternate option is
>> possible,
>> >> > but
>> >> > > > will
>> >> > > > > > >>> require for implementation details to be made in the API
>> and
>> >> > some
>> >> > > > > > >> exposure
>> >> > > > > > >>> of new data structures into the API (ie: CombinedKey).
>> >> > > > > > >>>
>> >> > > > > > >>> I appreciate any insight into this.
>> >> > > > > > >>>
>> >> > > > > > >>> Thanks.
>> >> > > > > > >>>
>> >> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
>> >> > > > > > adam.bellem...@gmail.com>
>> >> > > > > > >>> wrote:
>> >> > > > > > >>>
>> >> > > > > > >>>> Hi John
>> >> > > > > > >>>>
>> >> > > > > > >>>> Thanks for your feedback and assistance. I think your
>> >> summary
>> >> > is
>> >> > > > > > >> accurate
>> >> > > > > > >>>> from my perspective. Additionally, I would like to add
>> that
>> >> > > there
>> >> > > > > is a
>> >> > > > > > >> risk
>> >> > > > > > >>>> of inconsistent final states without performing the
>> >> > resolution.
>> >> > > > This
>> >> > > > > > is
>> >> > > > > > >> a
>> >> > > > > > >>>> major concern for me as most of the data I have dealt
>> with
>> >> is
>> >> > > > > produced
>> >> > > > > > >> by
>> >> > > > > > >>>> relational databases. We have seen a number of cases
>> where
>> >> a
>> >> > > user
>> >> > > > in
>> >> > > > > > the
>> >> > > > > > >>>> Rails UI has modified the field (foreign key), realized
>> >> they
>> >> > > made
>> >> > > > a
>> >> > > > > > >>>> mistake, and then updated the field again with a new
>> key.
>> >> The
>> >> > > > events
>> >> > > > > > are
>> >> > > > > > >>>> propagated out as they are produced, and as such we have
>> >> had
>> >> > > > > > real-world
>> >> > > > > > >>>> cases where these inconsistencies were propagated
>> >> downstream
>> >> > as
>> >> > > > the
>> >> > > > > > >> final
>> >> > > > > > >>>> values due to the race conditions in the fanout of the
>> >> data.
>> >> > > > > > >>>>
>> >> > > > > > >>>> This solution that I propose values correctness of the
>> >> final
>> >> > > > result
>> >> > > > > > over
>> >> > > > > > >>>> other factors.
>> >> > > > > > >>>>
>> >> > > > > > >>>> We could always move this function over to using a
>> >> > > KScatteredTable
>> >> > > > > > >>>> implementation in the future, and simply deprecate it
>> this
>> >> > join
>> >> > > > API
>> >> > > > > in
>> >> > > > > > >>>> time. I think I would like to hear more from some of the
>> >> other
>> >> > > > major
>> >> > > > > > >>>> committers on which course of action they would think is
>> >> best
>> >> > > > before
>> >> > > > > > any
>> >> > > > > > >>>> more coding is done.
>> >> > > > > > >>>>
>> >> > > > > > >>>> Thanks again
>> >> > > > > > >>>>
>> >> > > > > > >>>> Adam
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
>> >> > j...@confluent.io>
>> >> > > > > > wrote:
>> >> > > > > > >>>>
>> >> > > > > > >>>>> Hi Jan and Adam,
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results
>> are
>> >> > > > > encouraging.
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> Thanks for your performance experience as well, Jan. I
>> >> agree
>> >> > > that
>> >> > > > > > >> avoiding
>> >> > > > > > >>>>> unnecessary join outputs is especially important when
>> the
>> >> > > fan-out
>> >> > > > > is
>> >> > > > > > so
>> >> > > > > > >>>>> high. I suppose this could also be built into the
>> >> > > implementation
>> >> > > > > > we're
>> >> > > > > > >>>>> discussing, but it wouldn't have to be specified in the
>> >> KIP
>> >> > > > (since
>> >> > > > > > >> it's an
>> >> > > > > > >>>>> API-transparent optimization).
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> As far as whether or not to re-repartition the data, I
>> >> didn't
>> >> > > > bring
>> >> > > > > > it
>> >> > > > > > >> up
>> >> > > > > > >>>>> because it sounded like the two of you agreed to leave
>> the
>> >> > KIP
>> >> > > > > as-is,
>> >> > > > > > >>>>> despite the disagreement.
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> If you want my opinion, I feel like both approaches are
>> >> > > > reasonable.
>> >> > > > > > >>>>> It sounds like Jan values more the potential for
>> >> developers
>> >> > to
>> >> > > > > > optimize
>> >> > > > > > >>>>> their topologies to re-use the intermediate nodes,
>> whereas
>> >> > Adam
>> >> > > > > > places
>> >> > > > > > >>>>> more
>> >> > > > > > >>>>> value on having a single operator that people can use
>> >> without
>> >> > > > extra
>> >> > > > > > >> steps
>> >> > > > > > >>>>> at the end.
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> Personally, although I do find it exceptionally
>> annoying
>> >> > when a
>> >> > > > > > >> framework
>> >> > > > > > >>>>> gets in my way when I'm trying to optimize something,
>> it
>> >> > seems
>> >> > > > > better
>> >> > > > > > >> to
>> >> > > > > > >>>>> go
>> >> > > > > > >>>>> for a single operation.
>> >> > > > > > >>>>> * Encapsulating the internal transitions gives us
>> >> significant
>> >> > > > > > latitude
>> >> > > > > > >> in
>> >> > > > > > >>>>> the implementation (for example, joining only at the
>> end,
>> >> not
>> >> > > in
>> >> > > > > the
>> >> > > > > > >>>>> middle
>> >> > > > > > >>>>> to avoid extra data copying and out-of-order
>> resolution;
>> >> how
>> >> > we
>> >> > > > > > >> represent
>> >> > > > > > >>>>> the first repartition keys (combined keys vs. value
>> >> vectors),
>> >> > > > > etc.).
>> >> > > > > > >> If we
>> >> > > > > > >>>>> publish something like a KScatteredTable with the
>> >> > > > right-partitioned
>> >> > > > > > >> joined
>> >> > > > > > >>>>> data, then the API pretty much locks in the
>> >> implementation as
>> >> > > > well.
>> >> > > > > > >>>>> * The API seems simpler to understand and use. I do
>> mean
>> >> > > "seems";
>> >> > > > > if
>> >> > > > > > >>>>> anyone
>> >> > > > > > >>>>> wants to make the case that KScatteredTable is actually
>> >> > > simpler,
>> >> > > > I
>> >> > > > > > >> think
>> >> > > > > > >>>>> hypothetical usage code would help. From a relational
>> >> algebra
>> >> > > > > > >> perspective,
>> >> > > > > > >>>>> it seems like KTable.join(KTable) should produce a new
>> >> KTable
>> >> > > in
>> >> > > > > all
>> >> > > > > > >>>>> cases.
>> >> > > > > > >>>>> * That said, there might still be room in the API for a
>> >> > > different
>> >> > > > > > >>>>> operation
>> >> > > > > > >>>>> like what Jan has proposed to scatter a KTable, and
>> then
>> >> do
>> >> > > > things
>> >> > > > > > like
>> >> > > > > > >>>>> join, re-group, etc from there... I'm not sure; I
>> haven't
>> >> > > thought
>> >> > > > > > >> through
>> >> > > > > > >>>>> all the consequences yet.
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> This is all just my opinion after thinking over the
>> >> > discussion
>> >> > > so
>> >> > > > > > >> far...
>> >> > > > > > >>>>> -John
>> >> > > > > > >>>>>
>> >> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
>> >> > > > > > >> adam.bellem...@gmail.com>
>> >> > > > > > >>>>> wrote:
>> >> > > > > > >>>>>
>> >> > > > > > >>>>>> Updated the PR to take into account John's feedback.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> I did some preliminary testing for the performance of
>> the
>> >> > > > > > prefixScan.
>> >> > > > > > >> I
>> >> > > > > > >>>>>> have attached the file, but I will also include the
>> text
>> >> in
>> >> > > the
>> >> > > > > body
>> >> > > > > > >>>>> here
>> >> > > > > > >>>>>> for archival purposes (I am not sure what happens to
>> >> > attached
>> >> > > > > > files).
>> >> > > > > > >> I
>> >> > > > > > >>>>>> also updated the PR and the KIP accordingly.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Summary: It scales exceptionally well for scanning
>> large
>> >> > > values
>> >> > > > of
>> >> > > > > > >>>>>> records. As Jan mentioned previously, the real issue
>> >> would
>> >> > be
>> >> > > > more
>> >> > > > > > >>>>> around
>> >> > > > > > >>>>>> processing the resulting records after obtaining them.
>> >> For
>> >> > > > > instance,
>> >> > > > > > >> it
>> >> > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer
>> and a
>> >> > > further
>> >> > > > > > >>>>> ~35-85mS
>> >> > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of
>> >> them.
>> >> > > > > Iterating
>> >> > > > > > >>>>>> through the records just to generate a simple count
>> >> takes ~
>> >> > 40
>> >> > > > > times
>> >> > > > > > >>>>> longer
>> >> > > > > > >>>>>> than the flush + scan combined.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> ============================================================================================
>> >> > > > > > >>>>>> Setup:
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> ============================================================================================
>> >> > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap
>> >> > > (Xmx512m,
>> >> > > > > > >> Xms512m)
>> >> > > > > > >>>>>> CPU: i7 2.2 Ghz.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Note: I am using a slightly-modified,
>> directly-accessible
>> >> > > Kafka
>> >> > > > > > >> Streams
>> >> > > > > > >>>>>> RocksDB
>> >> > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding
>> the
>> >> > > > > > >>>>>> ProcessorContext).
>> >> > > > > > >>>>>> There are no modifications to the default RocksDB
>> values
>> >> > > > provided
>> >> > > > > in
>> >> > > > > > >> the
>> >> > > > > > >>>>>> 2.1/trunk release.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> keysize = 128 bytes
>> >> > > > > > >>>>>> valsize = 512 bytes
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Step 1:
>> >> > > > > > >>>>>> Write X positive matching events: (key = prefix +
>> >> > left-padded
>> >> > > > > > >>>>>> auto-incrementing integer)
>> >> > > > > > >>>>>> Step 2:
>> >> > > > > > >>>>>> Write 10X negative matching events (key = left-padded
>> >> > > > > > >> auto-incrementing
>> >> > > > > > >>>>>> integer)
>> >> > > > > > >>>>>> Step 3:
>> >> > > > > > >>>>>> Perform flush
>> >> > > > > > >>>>>> Step 4:
>> >> > > > > > >>>>>> Perform prefixScan
>> >> > > > > > >>>>>> Step 5:
>> >> > > > > > >>>>>> Iterate through return Iterator and validate the
>> count of
>> >> > > > expected
>> >> > > > > > >>>>> events.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> ============================================================================================
>> >> > > > > > >>>>>> Results:
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> ============================================================================================
>> >> > > > > > >>>>>> X = 1k (11k events total)
>> >> > > > > > >>>>>> Flush Time = 39 mS
>> >> > > > > > >>>>>> Scan Time = 7 mS
>> >> > > > > > >>>>>> 6.9 MB disk
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> --------------------------------------------------------------------------------------------
>> >> > > > > > >>>>>> X = 10k (110k events total)
>> >> > > > > > >>>>>> Flush Time = 45 mS
>> >> > > > > > >>>>>> Scan Time = 8 mS
>> >> > > > > > >>>>>> 127 MB
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> --------------------------------------------------------------------------------------------
>> >> > > > > > >>>>>> X = 100k (1.1M events total)
>> >> > > > > > >>>>>> Test1:
>> >> > > > > > >>>>>> Flush Time = 60 mS
>> >> > > > > > >>>>>> Scan Time = 12 mS
>> >> > > > > > >>>>>> 678 MB
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Test2:
>> >> > > > > > >>>>>> Flush Time = 45 mS
>> >> > > > > > >>>>>> Scan Time = 7 mS
>> >> > > > > > >>>>>> 576 MB
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> --------------------------------------------------------------------------------------------
>> >> > > > > > >>>>>> X = 1MB (11M events total)
>> >> > > > > > >>>>>> Test1:
>> >> > > > > > >>>>>> Flush Time = 52 mS
>> >> > > > > > >>>>>> Scan Time = 19 mS
>> >> > > > > > >>>>>> 7.2 GB
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Test2:
>> >> > > > > > >>>>>> Flush Time = 84 mS
>> >> > > > > > >>>>>> Scan Time = 34 mS
>> >> > > > > > >>>>>> 9.1 GB
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> --------------------------------------------------------------------------------------------
>> >> > > > > > >>>>>> X = 2.5M (27.5M events total)
>> >> > > > > > >>>>>> Test1:
>> >> > > > > > >>>>>> Flush Time = 82 mS
>> >> > > > > > >>>>>> Scan Time = 63 mS
>> >> > > > > > >>>>>> 17GB - 276 sst files
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Test2:
>> >> > > > > > >>>>>> Flush Time = 116 mS
>> >> > > > > > >>>>>> Scan Time = 35 mS
>> >> > > > > > >>>>>> 23GB - 361 sst files
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Test3:
>> >> > > > > > >>>>>> Flush Time = 103 mS
>> >> > > > > > >>>>>> Scan Time = 82 mS
>> >> > > > > > >>>>>> 19 GB - 300 sst files
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> --------------------------------------------------------------------------------------------
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M
>> >> events. I
>> >> > > > tried
>> >> > > > > > to
>> >> > > > > > >> go
>> >> > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into
>> the
>> >> > 100GB+
>> >> > > > > range
>> >> > > > > > >>>>> and my
>> >> > > > > > >>>>>> laptop ran out of disk. More extensive testing could
>> be
>> >> done
>> >> > > > but I
>> >> > > > > > >>>>> suspect
>> >> > > > > > >>>>>> that it would be in line with what we're seeing in the
>> >> > results
>> >> > > > > > above.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> At this point in time, I think the only major
>> discussion
>> >> > point
>> >> > > > is
>> >> > > > > > >> really
>> >> > > > > > >>>>>> around what Jan and I have disagreed on:
>> repartitioning
>> >> > back +
>> >> > > > > > >> resolving
>> >> > > > > > >>>>>> potential out of order issues or leaving that up to
>> the
>> >> > client
>> >> > > > to
>> >> > > > > > >>>>> handle.
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Thanks folks,
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> Adam
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
>> >> > > > > > jan.filip...@trivago.com
>> >> > > > > > >>>
>> >> > > > > > >>>>>> wrote:
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote:
>> >> > > > > > >>>>>>>> Hi all,
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Sorry that this discussion petered out... I think
>> the
>> >> 2.1
>> >> > > > > release
>> >> > > > > > >>>>>>> caused an
>> >> > > > > > >>>>>>>> extended distraction that pushed it off everyone's
>> >> radar
>> >> > > > (which
>> >> > > > > > was
>> >> > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had
>> >> some
>> >> > > > extend
>> >> > > > > > >>>>>>>> distractions of my own that kept (and continue to
>> >> keep) me
>> >> > > > > > >>>>> preoccupied.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I
>> guess
>> >> Jan
>> >> > > was
>> >> > > > > on
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> right
>> >> > > > > > >>>>>>>> track!
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document
>> and
>> >> the
>> >> > > > prior
>> >> > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts:
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> API Thoughts:
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a
>> >> > many-to-one
>> >> > > > > join.
>> >> > > > > > >>>>> Could
>> >> > > > > > >>>>>>> we
>> >> > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
>> >> flip
>> >> > > the
>> >> > > > > > design
>> >> > > > > > >>>>>>> around
>> >> > > > > > >>>>>>>> and make it a oneToManyJoin?
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the
>> join
>> >> > > type,
>> >> > > > > and
>> >> > > > > > it
>> >> > > > > > >>>>>>> seems
>> >> > > > > > >>>>>>>> like it might trick some people into using it for a
>> >> > > one-to-one
>> >> > > > > > join.
>> >> > > > > > >>>>>>> This
>> >> > > > > > >>>>>>>> would work, of course, but it would be super
>> >> inefficient
>> >> > > > > compared
>> >> > > > > > to
>> >> > > > > > >>>>> a
>> >> > > > > > >>>>>>>> simple rekey-and-join.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's
>> >> > specified
>> >> > > > > > whether
>> >> > > > > > >>>>>>> it's an
>> >> > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer
>> >> join, as
>> >> > > > > > >>>>> (neglecting
>> >> > > > > > >>>>>>> IQ),
>> >> > > > > > >>>>>>>> the rest can be achieved by filtering or by handling
>> >> it in
>> >> > > the
>> >> > > > > > >>>>>>> ValueJoiner.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
>> quite
>> >> > > right.
>> >> > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different
>> >> > > paradigms
>> >> > > > in
>> >> > > > > > >>>>> play in
>> >> > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of
>> >> three
>> >> > > > > > Serialized
>> >> > > > > > >>>>>>> args, I
>> >> > > > > > >>>>>>>> think it would be better to have one that allows
>> >> > > (optionally)
>> >> > > > > > >> setting
>> >> > > > > > >>>>>>> the 4
>> >> > > > > > >>>>>>>> incoming serdes. The result serde is defined by the
>> >> > > > > Materialized.
>> >> > > > > > >> The
>> >> > > > > > >>>>>>>> incoming serdes can be optional because they might
>> >> already
>> >> > > be
>> >> > > > > > >>>>> available
>> >> > > > > > >>>>>>> on
>> >> > > > > > >>>>>>>> the source KTables, or the default serdes from the
>> >> config
>> >> > > > might
>> >> > > > > be
>> >> > > > > > >>>>>>>> applicable.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other
>> joins
>> >> > > don't
>> >> > > > > > allow
>> >> > > > > > >>>>>>> setting
>> >> > > > > > >>>>>>>> one, and it seems like it might actually be harmful,
>> >> since
>> >> > > the
>> >> > > > > > rekey
>> >> > > > > > >>>>>>>> operation needs to produce results that are
>> >> co-partitioned
>> >> > > > with
>> >> > > > > > the
>> >> > > > > > >>>>>>> "other"
>> >> > > > > > >>>>>>>> KTable.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I
>> >> didn't
>> >> > > > > actually
>> >> > > > > > >>>>>>> follow
>> >> > > > > > >>>>>>>> what Matthias meant about namespacing requiring
>> >> > > > "deserializing"
>> >> > > > > > the
>> >> > > > > > >>>>>>> record
>> >> > > > > > >>>>>>>> header. The headers are already Strings, so I don't
>> >> think
>> >> > > that
>> >> > > > > > >>>>>>>> deserialization is required. If we applied the
>> >> namespace
>> >> > at
>> >> > > > > source
>> >> > > > > > >>>>> nodes
>> >> > > > > > >>>>>>>> and stripped it at sink nodes, this would be
>> >> practically
>> >> > no
>> >> > > > > > >> overhead.
>> >> > > > > > >>>>>>> The
>> >> > > > > > >>>>>>>> advantage of the namespace idea is that no public
>> API
>> >> > change
>> >> > > > wrt
>> >> > > > > > >>>>> headers
>> >> > > > > > >>>>>>>> needs to happen, and no restrictions need to be
>> placed
>> >> on
>> >> > > > users'
>> >> > > > > > >>>>>>> headers.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> (Although I'm wondering if we can get away without
>> the
>> >> > > header
>> >> > > > at
>> >> > > > > > >>>>> all...
>> >> > > > > > >>>>>>>> stay tuned)
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM
>> >> table
>> >> > > > > growing
>> >> > > > > > >>>>>>> without
>> >> > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively
>> >> > > implementing
>> >> > > > > OCC
>> >> > > > > > >> to
>> >> > > > > > >>>>>>>> resolve the problem you noted with disordering when
>> the
>> >> > > rekey
>> >> > > > is
>> >> > > > > > >>>>>>>> reversed... particularly notable when the FK
>> changes.
>> >> As
>> >> > > such,
>> >> > > > > it
>> >> > > > > > >>>>> only
>> >> > > > > > >>>>>>>> needs to track the most recent "version" (the
>> offset in
>> >> > the
>> >> > > > > source
>> >> > > > > > >>>>>>>> partition) of each key. Therefore, it should have
>> the
>> >> same
>> >> > > > > number
>> >> > > > > > of
>> >> > > > > > >>>>>>> keys
>> >> > > > > > >>>>>>>> as the source table at all times.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think
>> >> might
>> >> > be
>> >> > > > > > relevant
>> >> > > > > > >>>>> in
>> >> > > > > > >>>>>>> a
>> >> > > > > > >>>>>>>> couple of ways. One: it's just about storing the
>> >> timestamp
>> >> > > in
>> >> > > > > the
>> >> > > > > > >>>>> state
>> >> > > > > > >>>>>>>> store, but the ultimate idea is to effectively use
>> the
>> >> > > > timestamp
>> >> > > > > > as
>> >> > > > > > >>>>> an
>> >> > > > > > >>>>>>> OCC
>> >> > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't
>> >> want to
>> >> > > use
>> >> > > > > the
>> >> > > > > > >>>>>>>> timestamp for this operation, but if you were to
>> use a
>> >> > > similar
>> >> > > > > > >>>>>>> mechanism to
>> >> > > > > > >>>>>>>> store the source offset in the store alongside the
>> >> > re-keyed
>> >> > > > > > values,
>> >> > > > > > >>>>> then
>> >> > > > > > >>>>>>>> you could avoid a separate table.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a
>> long
>> >> > > time,
>> >> > > > so
>> >> > > > > > >> I've
>> >> > > > > > >>>>>>>> probably missed something here, but I'm wondering
>> if we
>> >> > can
>> >> > > > > avoid
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> HWM
>> >> > > > > > >>>>>>>> tracking at all and resolve out-of-order during a
>> final
>> >> > join
>> >> > > > > > >>>>> instead...
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K:
>> Letter
>> >> > FK,
>> >> > > > > (other
>> >> > > > > > >>>>>>> data))
>> >> > > > > > >>>>>>>> to a right table (Letter K: (some data)).
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Left table:
>> >> > > > > > >>>>>>>> 1: (A, xyz)
>> >> > > > > > >>>>>>>> 2: (B, asd)
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Right table:
>> >> > > > > > >>>>>>>> A: EntityA
>> >> > > > > > >>>>>>>> B: EntityB
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined
>> >> key,
>> >> > but
>> >> > > > not
>> >> > > > > > >>>>>>>> propagating the value at all..
>> >> > > > > > >>>>>>>> Rekey table:
>> >> > > > > > >>>>>>>> A-1: (dummy value)
>> >> > > > > > >>>>>>>> B-2: (dummy value)
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Which we then join with the right table to produce:
>> >> > > > > > >>>>>>>> A-1: EntityA
>> >> > > > > > >>>>>>>> B-2: EntityB
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Which gets rekeyed back:
>> >> > > > > > >>>>>>>> 1: A, EntityA
>> >> > > > > > >>>>>>>> 2: B, EntityB
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> And finally we do the actual join:
>> >> > > > > > >>>>>>>> Result table:
>> >> > > > > > >>>>>>>> 1: ((A, xyz), EntityA)
>> >> > > > > > >>>>>>>> 2: ((B, asd), EntityB)
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> The thing is that in that last join, we have the
>> >> > opportunity
>> >> > > > to
>> >> > > > > > >>>>> compare
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>> current FK in the left table with the incoming PK of
>> >> the
>> >> > > right
>> >> > > > > > >>>>> table. If
>> >> > > > > > >>>>>>>> they don't match, we just drop the event, since it
>> >> must be
>> >> > > > > > outdated.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A,
>> xyz)
>> >> gets
>> >> > > > > updated
>> >> > > > > > >> to
>> >> > > > > > >>>>>>> (1:
>> >> > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about
>> whether
>> >> the
>> >> > > > final
>> >> > > > > > >>>>> state
>> >> > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the
>> >> > algorithm
>> >> > > > > above,
>> >> > > > > > >>>>> you
>> >> > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
>> >> (B,
>> >> > > xyz),
>> >> > > > > (B,
>> >> > > > > > >>>>>>>> EntityB)). It seems like this does give you enough
>> >> > > information
>> >> > > > > to
>> >> > > > > > >>>>> make
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>> right choice, regardless of disordering.
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> Will check Adams patch, but this should work. As
>> >> mentioned
>> >> > > > often
>> >> > > > > I
>> >> > > > > > am
>> >> > > > > > >>>>>>> not convinced on partitioning back for the user
>> >> > > automatically.
>> >> > > > I
>> >> > > > > > >> think
>> >> > > > > > >>>>>>> this is the real performance eater ;)
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the
>> >> > > > performance
>> >> > > > > of
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>> range scans when records change in the right table.
>> >> You've
>> >> > > > said
>> >> > > > > > that
>> >> > > > > > >>>>>>> you've
>> >> > > > > > >>>>>>>> been using the algorithm you presented in production
>> >> for a
>> >> > > > > while.
>> >> > > > > > >> Can
>> >> > > > > > >>>>>>> you
>> >> > > > > > >>>>>>>> give us a sense of the performance characteristics
>> >> you've
>> >> > > > > > observed?
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The
>> >> topmost
>> >> > > > thing
>> >> > > > > > here
>> >> > > > > > >>>>> is
>> >> > > > > > >>>>>>> / was correctness. In practice I do not measure the
>> >> > > performance
>> >> > > > > of
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> range scan. Usual cases I run this with is emitting
>> >> 500k -
>> >> > > 1kk
>> >> > > > > rows
>> >> > > > > > >>>>>>> on a left hand side change. The range scan is just
>> the
>> >> work
>> >> > > you
>> >> > > > > > gotta
>> >> > > > > > >>>>>>> do, also when you pack your data into different
>> formats,
>> >> > > > usually
>> >> > > > > > the
>> >> > > > > > >>>>>>> rocks performance is very tight to the size of the
>> data
>> >> and
>> >> > > we
>> >> > > > > > can't
>> >> > > > > > >>>>>>> really change that. It is more important for users to
>> >> > prevent
>> >> > > > > > useless
>> >> > > > > > >>>>>>> updates to begin with. My left hand side is guarded
>> to
>> >> drop
>> >> > > > > changes
>> >> > > > > > >>>>> that
>> >> > > > > > >>>>>>> are not going to change my join output.
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> usually it's:
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> drop unused fields and then don't forward if
>> >> > old.equals(new)
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>> regarding to the performance of creating an iterator
>> for
>> >> > > > smaller
>> >> > > > > > >>>>>>> fanouts, users can still just do a group by first
>> then
>> >> > > anyways.
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>>> I could only think of one alternative, but I'm not
>> >> sure if
>> >> > > > it's
>> >> > > > > > >>>>> better
>> >> > > > > > >>>>>>> or
>> >> > > > > > >>>>>>>> worse... If the first re-key only needs to preserve
>> the
>> >> > > > original
>> >> > > > > > >> key,
>> >> > > > > > >>>>>>> as I
>> >> > > > > > >>>>>>>> proposed in #6, then we could store a vector of
>> keys in
>> >> > the
>> >> > > > > value:
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Left table:
>> >> > > > > > >>>>>>>> 1: A,...
>> >> > > > > > >>>>>>>> 2: B,...
>> >> > > > > > >>>>>>>> 3: A,...
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Gets re-keyed:
>> >> > > > > > >>>>>>>> A: [1, 3]
>> >> > > > > > >>>>>>>> B: [2]
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Then, the rhs part of the join would only need a
>> >> regular
>> >> > > > > > single-key
>> >> > > > > > >>>>>>> lookup.
>> >> > > > > > >>>>>>>> Of course we have to deal with the problem of large
>> >> > values,
>> >> > > as
>> >> > > > > > >>>>> there's
>> >> > > > > > >>>>>>> no
>> >> > > > > > >>>>>>>> bound on the number of lhs records that can
>> reference
>> >> rhs
>> >> > > > > records.
>> >> > > > > > >>>>>>> Offhand,
>> >> > > > > > >>>>>>>> I'd say we could page the values, so when one row is
>> >> past
>> >> > > the
>> >> > > > > > >>>>>>> threshold, we
>> >> > > > > > >>>>>>>> append the key for the next page. Then in most
>> cases,
>> >> it
>> >> > > would
>> >> > > > > be
>> >> > > > > > a
>> >> > > > > > >>>>>>> single
>> >> > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would
>> be
>> >> one
>> >> > > per
>> >> > > > > > (max
>> >> > > > > > >>>>>>> value
>> >> > > > > > >>>>>>>> size)/(avg lhs key size).
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> This seems more complex, though... Plus, I think
>> >> there's
>> >> > > some
>> >> > > > > > extra
>> >> > > > > > >>>>>>>> tracking we'd need to do to know when to emit a
>> >> > retraction.
>> >> > > > For
>> >> > > > > > >>>>> example,
>> >> > > > > > >>>>>>>> when record 1 is deleted, the re-key table would
>> just
>> >> have
>> >> > > (A:
>> >> > > > > > [3]).
>> >> > > > > > >>>>>>> Some
>> >> > > > > > >>>>>>>> kind of tombstone is needed so that the join result
>> >> for 1
>> >> > > can
>> >> > > > > also
>> >> > > > > > >> be
>> >> > > > > > >>>>>>>> retracted.
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> That's all!
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the
>> thoughtful
>> >> > KIP.
>> >> > > > > Sorry
>> >> > > > > > >> the
>> >> > > > > > >>>>>>>> discussion has been slow.
>> >> > > > > > >>>>>>>> -John
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
>> >> > > > > > >>>>> jan.filip...@trivago.com>
>> >> > > > > > >>>>>>>> wrote:
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>>> Id say you can just call the vote.
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>>> that happens all the time, and if something comes
>> up,
>> >> it
>> >> > > just
>> >> > > > > > goes
>> >> > > > > > >>>>> back
>> >> > > > > > >>>>>>>>> to discuss.
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>>> would not expect to much attention with another
>> >> another
>> >> > > email
>> >> > > > > in
>> >> > > > > > >>>>> this
>> >> > > > > > >>>>>>>>> thread.
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>>> best Jan
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
>> >> > > > > > >>>>>>>>>> Hello Contributors
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do
>> >> need
>> >> > to
>> >> > > > bump
>> >> > > > > > >>>>> this to
>> >> > > > > > >>>>>>>>> keep
>> >> > > > > > >>>>>>>>>> visibility up. I am still intending to push this
>> >> through
>> >> > > > once
>> >> > > > > > >>>>>>> contributor
>> >> > > > > > >>>>>>>>>> feedback is given.
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> Main points that need addressing:
>> >> > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current
>> >> > > singular
>> >> > > > > > graph
>> >> > > > > > >>>>> node
>> >> > > > > > >>>>>>>>> into
>> >> > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters
>> right
>> >> > > now. I
>> >> > > > > am
>> >> > > > > > a
>> >> > > > > > >>>>> bit
>> >> > > > > > >>>>>>>>> fuzzy
>> >> > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so
>> I
>> >> > would
>> >> > > > > > >>>>> appreciate
>> >> > > > > > >>>>>>> any
>> >> > > > > > >>>>>>>>>> help on this aspect.
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This
>> >> thread
>> >> > > has
>> >> > > > > > much
>> >> > > > > > >>>>>>>>> discourse
>> >> > > > > > >>>>>>>>>> between Jan and I between the current highwater
>> mark
>> >> > > > proposal
>> >> > > > > > and
>> >> > > > > > >> a
>> >> > > > > > >>>>>>>>> groupBy
>> >> > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we
>> need
>> >> to
>> >> > > > > strictly
>> >> > > > > > >>>>> handle
>> >> > > > > > >>>>>>>>> any
>> >> > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it
>> up
>> >> to
>> >> > the
>> >> > > > > > >>>>> consumer.
>> >> > > > > > >>>>>>> Any
>> >> > > > > > >>>>>>>>>> comments or suggestions here would also help.
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent
>> this
>> >> > from
>> >> > > > > > moving
>> >> > > > > > >>>>> to a
>> >> > > > > > >>>>>>>>> vote?
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> Thanks
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> Adam
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>> >> > > > > > >>>>>>>>> adam.bellem...@gmail.com>
>> >> > > > > > >>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> Hi Jan
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and
>> >> > > > > > >>>>> Stores.persistentWindowStore,
>> >> > > > > > >>>>>>> you
>> >> > > > > > >>>>>>>>>>> actually only need to specify the amount of
>> segments
>> >> > you
>> >> > > > want
>> >> > > > > > and
>> >> > > > > > >>>>> how
>> >> > > > > > >>>>>>>>> large
>> >> > > > > > >>>>>>>>>>> they are. To the best of my understanding, what
>> >> happens
>> >> > > is
>> >> > > > > that
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>>>> segments are automatically rolled over as new
>> data
>> >> with
>> >> > > new
>> >> > > > > > >>>>>>> timestamps
>> >> > > > > > >>>>>>>>> are
>> >> > > > > > >>>>>>>>>>> created. We use this exact functionality in some
>> of
>> >> the
>> >> > > > work
>> >> > > > > > done
>> >> > > > > > >>>>>>>>>>> internally at my company. For reference, this is
>> the
>> >> > > > hopping
>> >> > > > > > >>>>> windowed
>> >> > > > > > >>>>>>>>> store.
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> In the code that I have provided, there are going
>> >> to be
>> >> > > two
>> >> > > > > 24h
>> >> > > > > > >>>>>>>>> segments.
>> >> > > > > > >>>>>>>>>>> When a record is put into the windowStore, it
>> will
>> >> be
>> >> > > > > inserted
>> >> > > > > > at
>> >> > > > > > >>>>>>> time
>> >> > > > > > >>>>>>>>> T in
>> >> > > > > > >>>>>>>>>>> both segments. The two segments will always
>> overlap
>> >> by
>> >> > > 12h.
>> >> > > > > As
>> >> > > > > > >>>>> time
>> >> > > > > > >>>>>>>>> goes on
>> >> > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+),
>> the
>> >> > > oldest
>> >> > > > > > >> segment
>> >> > > > > > >>>>>>> will
>> >> > > > > > >>>>>>>>> be
>> >> > > > > > >>>>>>>>>>> automatically deleted and a new segment created.
>> The
>> >> > > > records
>> >> > > > > > are
>> >> > > > > > >>>>> by
>> >> > > > > > >>>>>>>>> default
>> >> > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that
>> it
>> >> is
>> >> > > the
>> >> > > > > > record
>> >> > > > > > >>>>>>> time,
>> >> > > > > > >>>>>>>>> not
>> >> > > > > > >>>>>>>>>>> the clock time, which is used.
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps
>> are
>> >> > > > retained
>> >> > > > > > when
>> >> > > > > > >>>>>>>>>>> restoring from the changelog.
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with
>> TTL
>> >> > at a
>> >> > > > > > >>>>>>> segment-level,
>> >> > > > > > >>>>>>>>>>> instead of at an individual record level.
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>> >> > > > > > >>>>>>> jan.filip...@trivago.com>
>> >> > > > > > >>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with
>> >> > > > > > ClassCastException
>> >> > > > > > >>>>> or
>> >> > > > > > >>>>>>>>>>>> similar.
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> You either would have to specify the window you
>> >> > > fetch/put
>> >> > > > or
>> >> > > > > > >>>>> iterate
>> >> > > > > > >>>>>>>>>>>> across all windows the key was found in right?
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check
>> >> stream-time
>> >> > > > under
>> >> > > > > > the
>> >> > > > > > >>>>>>> hoods
>> >> > > > > > >>>>>>>>>>>> that would be a questionable interface.
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking
>> all
>> >> the
>> >> > > > > windows
>> >> > > > > > >>>>>>> earlier?
>> >> > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable
>> >> time
>> >> > > > > > gurantees.
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> Best
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>> >> > > > > > >>>>>>>>>>>>> Hi Jan
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
>> >> changed
>> >> > > the
>> >> > > > > > state
>> >> > > > > > >>>>>>> store,
>> >> > > > > > >>>>>>>>>>>> not
>> >> > > > > > >>>>>>>>>>>>> the ProcessorSupplier.
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>> Thanks,
>> >> > > > > > >>>>>>>>>>>>> Adam
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> @Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed
>> >> > something
>> >> > > > that
>> >> > > > > > >>>>> will be
>> >> > > > > > >>>>>>>>>>>>>>> extremely
>> >> > > > > > >>>>>>>>>>>>>>> useful for this KIP.
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> @Jan
>> >> > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being
>> said, I
>> >> > will
>> >> > > > not
>> >> > > > > > be
>> >> > > > > > >>>>>>> moving
>> >> > > > > > >>>>>>>>>>>> ahead
>> >> > > > > > >>>>>>>>>>>>>>> with an implementation using
>> reshuffle/groupBy
>> >> > > solution
>> >> > > > > as
>> >> > > > > > >> you
>> >> > > > > > >>>>>>>>>>>> propose.
>> >> > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it
>> >> > yourself
>> >> > > > off
>> >> > > > > > of
>> >> > > > > > >>>>> my
>> >> > > > > > >>>>>>>>>>>> current PR
>> >> > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I
>> >> would
>> >> > > be
>> >> > > > > more
>> >> > > > > > >>>>> than
>> >> > > > > > >>>>>>>>>>>> happy to
>> >> > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it
>> >> > stands
>> >> > > > > right
>> >> > > > > > >>>>> now,
>> >> > > > > > >>>>>>> I do
>> >> > > > > > >>>>>>>>>>>> not
>> >> > > > > > >>>>>>>>>>>>>>> really have more time to invest into
>> >> alternatives
>> >> > > > without
>> >> > > > > > >>>>> there
>> >> > > > > > >>>>>>>>> being
>> >> > > > > > >>>>>>>>>>>> a
>> >> > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters
>> which
>> >> > they
>> >> > > > > would
>> >> > > > > > >>>>>>> prefer.
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally
>> gave
>> >> up
>> >> > on
>> >> > > > the
>> >> > > > > > >>>>> streams
>> >> > > > > > >>>>>>>>> DSL
>> >> > > > > > >>>>>>>>>>>> for
>> >> > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have
>> pulled
>> >> > this
>> >> > > > KIP
>> >> > > > > > >>>>> through
>> >> > > > > > >>>>>>>>>>>> already.
>> >> > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL
>> based on
>> >> > > PAPI.
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the
>> >> windowed
>> >> > > > state
>> >> > > > > > >>>>> store
>> >> > > > > > >>>>>>> in
>> >> > > > > > >>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>> next
>> >> > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and
>> then I
>> >> > will
>> >> > > > come
>> >> > > > > > >> back
>> >> > > > > > >>>>>>> for
>> >> > > > > > >>>>>>>>>>>> final
>> >> > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that
>> any of
>> >> > the
>> >> > > > > > binding
>> >> > > > > > >>>>>>> voters
>> >> > > > > > >>>>>>>>>>>> could
>> >> > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
>> >> updated
>> >> > it
>> >> > > > > > >>>>> according
>> >> > > > > > >>>>>>> to
>> >> > > > > > >>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>> latest plan:
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> >> > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a
>> windowed
>> >> > > store.
>> >> > > > > > This
>> >> > > > > > >>>>>>> could
>> >> > > > > > >>>>>>>>> be
>> >> > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
>> they
>> >> > are
>> >> > > > > > >>>>> completed.
>> >> > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> Thanks,
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> Adam
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
>> >> > already
>> >> > > > > > updated
>> >> > > > > > >>>>> in
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> PR?
>> >> > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
>> Missing
>> >> > > > > something?
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
>> Wang <
>> >> > > > > > >>>>>>> wangg...@gmail.com>
>> >> > > > > > >>>>>>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533
>> is
>> >> the
>> >> > > > wrong
>> >> > > > > > >> link,
>> >> > > > > > >>>>>>> as it
>> >> > > > > > >>>>>>>>>>>> is
>> >> > > > > > >>>>>>>>>>>>>>>> for
>> >> > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
>> >> part of
>> >> > > > > KIP-258
>> >> > > > > > >>>>> we do
>> >> > > > > > >>>>>>>>>>>> want to
>> >> > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source
>> >> > KTable"
>> >> > > > such
>> >> > > > > > >> that
>> >> > > > > > >>>>>>>>>>>> instead of
>> >> > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the
>> materialized
>> >> > store,
>> >> > > > > i.e.
>> >> > > > > > >>>>>>> following
>> >> > > > > > >>>>>>>>>>>>>>>> offset
>> >> > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are
>> older
>> >> > than
>> >> > > > the
>> >> > > > > > >>>>> current
>> >> > > > > > >>>>>>>>>>>> key's
>> >> > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
>> ordering.
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>> Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
>> >> Wang <
>> >> > > > > > >>>>>>>>> wangg...@gmail.com>
>> >> > > > > > >>>>>>>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>> Hello Adam,
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
>> >> final
>> >> > > step
>> >> > > > > > (i.e.
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>> high
>> >> > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
>> >> with
>> >> > a
>> >> > > > > window
>> >> > > > > > >>>>>>> store),
>> >> > > > > > >>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>> think
>> >> > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually
>> >> help:
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a
>> >> key-value
>> >> > > > store
>> >> > > > > > >>>>> (i.e.
>> >> > > > > > >>>>>>> only
>> >> > > > > > >>>>>>>>>>>> for
>> >> > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
>> >> usage,
>> >> > as
>> >> > > > > > >>>>> described
>> >> > > > > > >>>>>>> in
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> https://issues.apache.org/jira/browse/KAFKA-5533
>> >> > ,
>> >> > > is
>> >> > > > > > that
>> >> > > > > > >>>>> we
>> >> > > > > > >>>>>>> can
>> >> > > > > > >>>>>>>>>>>> then
>> >> > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if
>> its
>> >> > > > > timestamp
>> >> > > > > > is
>> >> > > > > > >>>>>>>>> smaller
>> >> > > > > > >>>>>>>>>>>> than
>> >> > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp.
>> I
>> >> > think
>> >> > > it
>> >> > > > > is
>> >> > > > > > >>>>> very
>> >> > > > > > >>>>>>>>>>>> similar to
>> >> > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark
>> based
>> >> > > > > filtering,
>> >> > > > > > >>>>> while
>> >> > > > > > >>>>>>>>> you
>> >> > > > > > >>>>>>>>>>>> only
>> >> > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of
>> the
>> >> > > joining
>> >> > > > > > >> records
>> >> > > > > > >>>>>>> are
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>> correctly
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the
>> >> final
>> >> > > > stage.
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
>> and
>> >> > hence
>> >> > > > > > >>>>>>> non-windowed
>> >> > > > > > >>>>>>>>>>>> KTables
>> >> > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
>> >> really
>> >> > > have
>> >> > > > a
>> >> > > > > > good
>> >> > > > > > >>>>>>>>> support
>> >> > > > > > >>>>>>>>>>>> for
>> >> > > > > > >>>>>>>>>>>>>>>>> their joins anyways (
>> >> > > > > > >>>>>>>>>>>>
>> https://issues.apache.org/jira/browse/KAFKA-7107)
>> >> > > > > > >>>>>>>>>>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed
>> >> > > KTable-KTable
>> >> > > > > > >>>>> non-key
>> >> > > > > > >>>>>>>>> joins
>> >> > > > > > >>>>>>>>>>>> for
>> >> > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
>> Filipiak
>> >> <
>> >> > > > > > >>>>>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation
>> would
>> >> > grow
>> >> > > > > > >> endlessly
>> >> > > > > > >>>>>>> based
>> >> > > > > > >>>>>>>>>>>> on
>> >> > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a
>> pair
>> >> of
>> >> > > > (<this
>> >> > > > > > >>>>> table
>> >> > > > > > >>>>>>>>>>>> primary
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> key>,
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This
>> is
>> >> used
>> >> > > to
>> >> > > > > > >>>>>>> differentiate
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> between
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
>> >> proposal
>> >> > > > would
>> >> > > > > > be
>> >> > > > > > >>>>> to
>> >> > > > > > >>>>>>>>>>>> replace
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> it
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
>> >> This
>> >> > > would
>> >> > > > > > allow
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>> same
>> >> > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on
>> time.
>> >> This
>> >> > > > > should
>> >> > > > > > >>>>> allow
>> >> > > > > > >>>>>>> for
>> >> > > > > > >>>>>>>>>>>> all
>> >> > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
>> >> > should
>> >> > > be
>> >> > > > > > >>>>>>> customizable
>> >> > > > > > >>>>>>>>>>>> by
>> >> > > > > > >>>>>>>>>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
>> >> perhaps
>> >> > > just
>> >> > > > > 10
>> >> > > > > > >>>>>>> minutes
>> >> > > > > > >>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> window,
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...).
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can
>> do
>> >> the
>> >> > > > trick
>> >> > > > > > >> here.
>> >> > > > > > >>>>>>> Even
>> >> > > > > > >>>>>>>>>>>> if I
>> >> > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic
>> >> > > repartitioning
>> >> > > > > > >>>>> optional
>> >> > > > > > >>>>>>>>>>>> since I
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
>> >> am a
>> >> > > > little
>> >> > > > > > bit
>> >> > > > > > >>>>>>>>>>>> sceptical
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> about
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially
>> one
>> >> > > could
>> >> > > > > run
>> >> > > > > > >>>>> into
>> >> > > > > > >>>>>>>>>>>> problems
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> when
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window
>> >> border. I
>> >> > > will
>> >> > > > > > check
>> >> > > > > > >>>>> you
>> >> > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its
>> >> problematic, we
>> >> > > > could
>> >> > > > > > >>>>> still
>> >> > > > > > >>>>>>>>> check
>> >> > > > > > >>>>>>>>>>>>>>>>>> _all_
>> >> > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad
>> performance
>> >> > > impact I
>> >> > > > > > >> guess.
>> >> > > > > > >>>>>>> Will
>> >> > > > > > >>>>>>>>>>>> let
>> >> > > > > > >>>>>>>>>>>>>>>>>> you
>> >> > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be
>> correct
>> >> as
>> >> > > is. I
>> >> > > > > > >>>>> wouldn't
>> >> > > > > > >>>>>>> not
>> >> > > > > > >>>>>>>>>>>> like
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
>> >> > > timestamp(A)  <
>> >> > > > > > >>>>>>>>> timestamp(B).
>> >> > > > > > >>>>>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> think
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> we can't expect that.
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> @Jan
>> >> > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now
>> -
>> >> > thanks
>> >> > > > for
>> >> > > > > > the
>> >> > > > > > >>>>>>>>>>>> diagram, it
>> >> > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I
>> do
>> >> not
>> >> > > have
>> >> > > > > the
>> >> > > > > > >>>>>>> original
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> primary
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was
>> >> > > available
>> >> > > > > > then
>> >> > > > > > >>>>> you
>> >> > > > > > >>>>>>>>>>>> would be
>> >> > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the
>> Map.
>> >> > That
>> >> > > > > being
>> >> > > > > > >>>>> said,
>> >> > > > > > >>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> encourage
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
>> for
>> >> > > clarity
>> >> > > > > for
>> >> > > > > > >>>>>>> everyone
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> else.
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just
>> really
>> >> hard
>> >> > > > work.
>> >> > > > > > But
>> >> > > > > > >>>>> I
>> >> > > > > > >>>>>>>>>>>> understand
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
>> >> > > original
>> >> > > > > > >> primary
>> >> > > > > > >>>>>>> key,
>> >> > > > > > >>>>>>>>> We
>> >> > > > > > >>>>>>>>>>>>>>>>>> have
>> >> > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in
>> PAPI
>> >> > and
>> >> > > > > > >> basically
>> >> > > > > > >>>>>>> not
>> >> > > > > > >>>>>>>>>>>> using
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> any
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
>> missed
>> >> > that
>> >> > > in
>> >> > > > > > >>>>> original
>> >> > > > > > >>>>>>> DSL
>> >> > > > > > >>>>>>>>>>>> its
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> not
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
>> >> up on
>> >> > > my
>> >> > > > > end.
>> >> > > > > > >>>>> Will
>> >> > > > > > >>>>>>>>>>>> finish
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
>> >> week.
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't
>> the
>> >> Map
>> >> > > stay
>> >> > > > > > >> inside
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> State
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
>> changes
>> >> > have
>> >> > > > > > >>>>> propagated?
>> >> > > > > > >>>>>>>>> Isn't
>> >> > > > > > >>>>>>>>>>>>>>>>>>> this
>> >> > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
>> >> state
>> >> > > > store?
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
>> >> substractor
>> >> > is
>> >> > > > > gonna
>> >> > > > > > >>>>>>> return
>> >> > > > > > >>>>>>>>>>>> `null`
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
>> >> there
>> >> > is
>> >> > > > > going
>> >> > > > > > to
>> >> > > > > > >>>>> be
>> >> > > > > > >>>>>>> a
>> >> > > > > > >>>>>>>>>>>> store
>> >> > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use
>> this
>> >> > store
>> >> > > > > > directly
>> >> > > > > > >>>>> for
>> >> > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
>> is a
>> >> > > > regular
>> >> > > > > > >>>>> store,
>> >> > > > > > >>>>>>>>>>>> satisfying
>> >> > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
>> >> join.
>> >> > > The
>> >> > > > > > >>>>> Windowed
>> >> > > > > > >>>>>>>>>>>> store is
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> not
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next
>> statefull
>> >> > > > operation
>> >> > > > > > we
>> >> > > > > > >>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
>> >> have
>> >> > the
>> >> > > > > > window
>> >> > > > > > >>>>>>> store
>> >> > > > > > >>>>>>>>>>>> also
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> have
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> the values then.
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
>> custom
>> >> > group
>> >> > > > by
>> >> > > > > > >>>>> before
>> >> > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary
>> key i
>> >> > think
>> >> > > > it
>> >> > > > > > >> would
>> >> > > > > > >>>>>>> help
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> users
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given
>> the
>> >> > > > original
>> >> > > > > > >>>>> primary
>> >> > > > > > >>>>>>>>> key
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> issue I
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid
>> >> foundation
>> >> > > to
>> >> > > > > > build
>> >> > > > > > >>>>> on.
>> >> > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
>> user.
>> >> > very
>> >> > > > > > >>>>>>> unfortunate. I
>> >> > > > > > >>>>>>>>>>>> could
>> >> > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I
>> do
>> >> not
>> >> > > > think
>> >> > > > > > its
>> >> > > > > > >>>>> a
>> >> > > > > > >>>>>>> good
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> decision.
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Thanks
>> >> > > > > > >>>>>>>>>>>>>>>>>>> Adam
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM,
>> Prajakta
>> >> > > Dumbre <
>> >> > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
>> >> > > > > > >>>>>>> dumbreprajakta...@gmail.com
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           please remove me from this
>> group
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM
>> >> Jan
>> >> > > > > Filipiak
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> >> <mailto:
>> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Hi Adam,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > give me some time, will make
>> >> such a
>> >> > > > > chart.
>> >> > > > > > >> last
>> >> > > > > > >>>>>>> time i
>> >> > > > > > >>>>>>>>>>>> didn't
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           get along
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > well with giphy and ruined
>> all
>> >> your
>> >> > > > > charts.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done
>> >> today
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam
>> >> Bellemare
>> >> > > > > wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > Hi Jan
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > I have included a diagram
>> of
>> >> > what I
>> >> > > > > > >> attempted
>> >> > > > > > >>>>> on
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> KIP.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>
>> >> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>> >> > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>
>> >> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>> >> > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > I attempted this back at
>> the
>> >> > start
>> >> > > of
>> >> > > > > my
>> >> > > > > > own
>> >> > > > > > >>>>>>>>>>>> implementation
>> >> > > > > > >>>>>>>>>>>>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           this
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > solution, and since I could
>> >> not
>> >> > get
>> >> > > > it
>> >> > > > > to
>> >> > > > > > >>>>> work I
>> >> > > > > > >>>>>>> have
>> >> > > > > > >>>>>>>>>>>> since
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           discarded the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > code. At this point in
>> time,
>> >> if
>> >> > you
>> >> > > > > wish
>> >> > > > > > to
>> >> > > > > > >>>>>>> continue
>> >> > > > > > >>>>>>>>>>>> pursuing
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           for your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask
>> that
>> >> you
>> >> > > > please
>> >> > > > > > >>>>> create a
>> >> > > > > > >>>>>>>>>>>> diagram on
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           the KIP
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > carefully explaining your
>> >> > solution.
>> >> > > > > > Please
>> >> > > > > > >>>>> feel
>> >> > > > > > >>>>>>> free
>> >> > > > > > >>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>> use
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           the image I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > just posted as a starting
>> >> point.
>> >> > I
>> >> > > am
>> >> > > > > > having
>> >> > > > > > >>>>>>> trouble
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           understanding your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > explanations but I think
>> that
>> >> a
>> >> > > > > carefully
>> >> > > > > > >>>>>>> constructed
>> >> > > > > > >>>>>>>>>>>> diagram
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           will clear
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > up
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > any misunderstandings.
>> >> > Alternately,
>> >> > > > > > please
>> >> > > > > > >>>>> post a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           comprehensive PR with
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > your solution. I can only
>> >> guess
>> >> > at
>> >> > > > what
>> >> > > > > > you
>> >> > > > > > >>>>>>> mean, and
>> >> > > > > > >>>>>>>>>>>> since I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           value my
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > own
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > time as much as you value
>> >> yours,
>> >> > I
>> >> > > > > > believe
>> >> > > > > > >> it
>> >> > > > > > >>>>> is
>> >> > > > > > >>>>>>> your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           responsibility to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > provide an implementation
>> >> instead
>> >> > > of
>> >> > > > me
>> >> > > > > > >>>>> trying to
>> >> > > > > > >>>>>>>>> guess.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > Adam
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00
>> >> AM,
>> >> > Jan
>> >> > > > > > Filipiak
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> >> <mailto:
>> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Hi James,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing
>> >> > interested.
>> >> > > > > kafka
>> >> > > > > > >>>>>>> streams at
>> >> > > > > > >>>>>>>>>>>> this
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           point supports
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as
>> long as
>> >> > both
>> >> > > > > > streams
>> >> > > > > > >>>>> have
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> same
>> >> > > > > > >>>>>>>>>>>>>>>>>>> key.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Adam is currently
>> >> implementing a
>> >> > > > join
>> >> > > > > > >> where a
>> >> > > > > > >>>>>>> KTable
>> >> > > > > > >>>>>>>>>>>> and a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           KTable can
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> a one to many relation
>> ship
>> >> > (1:n).
>> >> > > > We
>> >> > > > > > >> exploit
>> >> > > > > > >>>>>>> that
>> >> > > > > > >>>>>>>>>>>> rocksdb
>> >> > > > > > >>>>>>>>>>>>>>>>>>> is
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> a
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > >> datastore that keeps data
>> >> sorted
>> >> > (At
>> >> > > > > least
>> >> > > > > > >>>>>>> exposes an
>> >> > > > > > >>>>>>>>>>>> API to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           access the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted
>> >> > fashion).
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I think the technical
>> caveats
>> >> > are
>> >> > > > well
>> >> > > > > > >>>>>>> understood
>> >> > > > > > >>>>>>>>> now
>> >> > > > > > >>>>>>>>>>>> and we
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> are
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > basically
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API
>> >> > Design
>> >> > > (
>> >> > > > > when
>> >> > > > > > >> Adam
>> >> > > > > > >>>>>>> sees
>> >> > > > > > >>>>>>>>> my
>> >> > > > > > >>>>>>>>>>>> newest
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           message).
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track
>> >> record of
>> >> > > > > loosing
>> >> > > > > > >>>>> those
>> >> > > > > > >>>>>>> kinda
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           arguments within
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> streams community and I
>> have
>> >> no
>> >> > > clue
>> >> > > > > > why.
>> >> > > > > > >> So
>> >> > > > > > >>>>> I
>> >> > > > > > >>>>>>>>>>>> literally
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           can't wait for
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > you
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> to churn through this
>> thread
>> >> and
>> >> > > > give
>> >> > > > > > you
>> >> > > > > > >>>>>>> opinion on
>> >> > > > > > >>>>>>>>>>>> how we
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           should
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > design
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the return type of the
>> >> > > oneToManyJoin
>> >> > > > > and
>> >> > > > > > >> how
>> >> > > > > > >>>>>>> many
>> >> > > > > > >>>>>>>>>>>> power we
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           want to give
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity"
>> >> (where
>> >> > > > > > simplicity
>> >> > > > > > >>>>> isn't
>> >> > > > > > >>>>>>>>>>>> really that
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           as users
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > still
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> need to understand it I
>> >> argue)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> waiting for you to join
>> in on
>> >> > the
>> >> > > > > > >> discussion
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Best Jan
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James
>> >> Kwan
>> >> > > > wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I am new to this group
>> and I
>> >> > > found
>> >> > > > > this
>> >> > > > > > >>>>> subject
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           interesting.  Sounds
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > like
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> you guys want to
>> implement a
>> >> > join
>> >> > > > > > table of
>> >> > > > > > >>>>> two
>> >> > > > > > >>>>>>>>>>>> streams? Is
>> >> > > > > > >>>>>>>>>>>>>>>>>>> there
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > somewhere
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I can see the original
>> >> > > requirement
>> >> > > > or
>> >> > > > > > >>>>> proposal?
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13
>> AM,
>> >> Jan
>> >> > > > > > Filipiak
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> >> <mailto:
>> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17,
>> Adam
>> >> > > > Bellemare
>> >> > > > > > >> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing
>> >> using a
>> >> > > > > > Windowed
>> >> > > > > > >>>>> Store
>> >> > > > > > >>>>>>> to
>> >> > > > > > >>>>>>>>>>>> store the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           highwater
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> mark.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this
>> >> > should
>> >> > > > work
>> >> > > > > > >> fine,
>> >> > > > > > >>>>>>> with
>> >> > > > > > >>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>> caveat
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           being that
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > it
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> can
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> only resolve
>> out-of-order
>> >> > > arrival
>> >> > > > > > for up
>> >> > > > > > >>>>> to
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> size of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           the window
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > (ie:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This
>> would
>> >> > > remove
>> >> > > > > the
>> >> > > > > > >>>>>>> possibility
>> >> > > > > > >>>>>>>>>>>> of it
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> being
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > unbounded
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> in
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> size.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's
>> >> > > > suggestion, I
>> >> > > > > > >>>>> believe
>> >> > > > > > >>>>>>> this
>> >> > > > > > >>>>>>>>> is
>> >> > > > > > >>>>>>>>>>>> where
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           we will
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement.
>> >> > While I
>> >> > > > do
>> >> > > > > > not
>> >> > > > > > >>>>>>> disagree
>> >> > > > > > >>>>>>>>>>>> with your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           statement
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> about
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be
>> >> additional
>> >> > > > joins
>> >> > > > > > done
>> >> > > > > > >>>>> in a
>> >> > > > > > >>>>>>>>>>>> real-world
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           workflow, I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > do
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> not
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> see how you can
>> >> conclusively
>> >> > > deal
>> >> > > > > > with
>> >> > > > > > >>>>>>>>> out-of-order
>> >> > > > > > >>>>>>>>>>>>>>>>>>> arrival
>> >> > > > > > >>>>>>>>>>>>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent
>> >> > joins. I
>> >> > > > > have
>> >> > > > > > >>>>>>> attempted
>> >> > > > > > >>>>>>>>> what
>> >> > > > > > >>>>>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           think you have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a
>> >> > high-water,
>> >> > > > > using
>> >> > > > > > >>>>>>> groupBy and
>> >> > > > > > >>>>>>>>>>>> reduce)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           and found
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> that if
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes
>> >> too
>> >> > > > > quickly,
>> >> > > > > > or
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>> load
>> >> > > > > > >>>>>>>>> on
>> >> > > > > > >>>>>>>>>>>> a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           stream thread
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > is
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> too
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined
>> messages
>> >> > will
>> >> > > > > arrive
>> >> > > > > > >>>>>>>>> out-of-order
>> >> > > > > > >>>>>>>>>>>> and be
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           incorrectly
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that
>> an
>> >> > > > > intermediate
>> >> > > > > > >>>>> event
>> >> > > > > > >>>>>>> is
>> >> > > > > > >>>>>>>>>>>>>>>>>>> represented
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           as the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > final
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> event.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light
>> on
>> >> > your
>> >> > > > > > groupBy
>> >> > > > > > >>>>>>>>>>>> implementation.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           There must be
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion
>> where it
>> >> > is,
>> >> > > I
>> >> > > > > > would
>> >> > > > > > >>>>> just
>> >> > > > > > >>>>>>> like
>> >> > > > > > >>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           confirm. The idea
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it
>> >> must be
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess
>> up.
>> >> I
>> >> > > would
>> >> > > > > > like
>> >> > > > > > >> to
>> >> > > > > > >>>>>>> clarify
>> >> > > > > > >>>>>>>>>>>> before
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           we draw a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the
>> >> > scattered
>> >> > > > > events
>> >> > > > > > >>>>> back to
>> >> > > > > > >>>>>>>>> their
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> original
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>> partitions is the only
>> way I
>> >> > know
>> >> > > > how
>> >> > > > > > to
>> >> > > > > > >>>>>>>>> conclusively
>> >> > > > > > >>>>>>>>>>>> deal
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           with
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in
>> a
>> >> > given
>> >> > > > time
>> >> > > > > > >> frame,
>> >> > > > > > >>>>>>> and to
>> >> > > > > > >>>>>>>>>>>> ensure
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           that the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > data
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> is
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent
>> with
>> >> > the
>> >> > > > > input
>> >> > > > > > >>>>> events.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code
>> to
>> >> > share
>> >> > > > that
>> >> > > > > > >>>>>>> illustrates
>> >> > > > > > >>>>>>>>> your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           approach, I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > would
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> be
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it
>> would
>> >> > > remove
>> >> > > > > any
>> >> > > > > > >>>>>>>>>>>> misunderstandings
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           that I may
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking
>> >> for
>> >> > my
>> >> > > > > code.
>> >> > > > > > I
>> >> > > > > > >>>>> don't
>> >> > > > > > >>>>>>> have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           something easily
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> readable here as its
>> >> bloated
>> >> > > with
>> >> > > > > > >>>>> OO-patterns.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> @Override
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K
>> >> aggKey,
>> >> > V
>> >> > > > > > value, T
>> >> > > > > > >>>>>>>>> aggregate)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      {
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V>
>> >> > > > > currentStateAsMap =
>> >> > > > > > >>>>>>>>>>>> asMap(aggregate);
>> >> > > > > > >>>>>>>>>>>>>>>>>>> <<
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           imaginary
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
>> >> > > > > > >>>>> mapper.apply(value);
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << this is
>> the
>> >> > > place
>> >> > > > > > where
>> >> > > > > > >>>>> people
>> >> > > > > > >>>>>>>>>>>> actually
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           gonna have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > issues
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and why you probably
>> >> couldn't
>> >> > do
>> >> > > > it.
>> >> > > > > > we
>> >> > > > > > >>>>> would
>> >> > > > > > >>>>>>> need
>> >> > > > > > >>>>>>>>>>>> to find
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           a solution
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > here.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that
>> yet.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << we
>> >> propagate
>> >> > the
>> >> > > > > > field in
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>>>>> joiner, so
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           that we can
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > pick
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate.
>> >> > Probably
>> >> > > > you
>> >> > > > > > have
>> >> > > > > > >>>>> not
>> >> > > > > > >>>>>>>>> thought
>> >> > > > > > >>>>>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           this in your
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> approach right?
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I am
>> very
>> >> open
>> >> > > to
>> >> > > > > > find a
>> >> > > > > > >>>>>>> generic
>> >> > > > > > >>>>>>>>>>>> solution
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           here. In my
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is
>> >> broken
>> >> > in
>> >> > > > > > >>>>>>>>> KTableImpl.GroupBy
>> >> > > > > > >>>>>>>>>>>> that
>> >> > > > > > >>>>>>>>>>>>>>>>>>> it
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           looses
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the keys
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the
>> >> > aggregate
>> >> > > > > key.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I
>> >> abstracted
>> >> > it
>> >> > > > away
>> >> > > > > > back
>> >> > > > > > >>>>>>> then way
>> >> > > > > > >>>>>>>>>>>> before
>> >> > > > > > >>>>>>>>>>>>>>>>>>> i
>> >> > > > > > >>>>>>>>>>>>>>>>>>> was
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > thinking
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That
>> is
>> >> > why I
>> >> > > > > > didn't
>> >> > > > > > >>>>>>> realize
>> >> > > > > > >>>>>>>>> its
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           significance here.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              <<
>> Opinions?
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          for (V m :
>> >> current)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > currentStateAsMap.put(mapper.apply(m),
>> >> > > > > > >> m);
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > currentStateAsMap.put(toModifyKey,
>> >> > > > > > >> value);
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          else
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > currentStateAsMap.remove(toModifyKey);
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > if(currentStateAsMap.isEmpty()){
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>                  return
>> >> null;
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              }
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          retrun
>> >> > > > > > >>>>>>> asAggregateType(currentStateAsMap)
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      }
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Adam
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at
>> >> 3:35
>> >> > PM,
>> >> > > > Jan
>> >> > > > > > >>>>> Filipiak
>> >> > > > > > >>>>>>> <
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com
>> >> <mailto:
>> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for
>> bringing
>> >> > > Matthias
>> >> > > > > to
>> >> > > > > > >>>>> speed!
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> about the
>> differences. I
>> >> > think
>> >> > > > > > >> re-keying
>> >> > > > > > >>>>>>> back
>> >> > > > > > >>>>>>>>>>>> should be
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           optional at
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> best.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return
>> a
>> >> > > > > > KScatteredTable
>> >> > > > > > >>>>> with
>> >> > > > > > >>>>>>>>>>>> reshuffle()
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           returning
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> KTable<originalKey,Joined>
>> >> > to
>> >> > > > make
>> >> > > > > > the
>> >> > > > > > >>>>>>> backwards
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           repartitioning
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big
>> >> favour of
>> >> > > > doing
>> >> > > > > > the
>> >> > > > > > >>>>> out
>> >> > > > > > >>>>>>> of
>> >> > > > > > >>>>>>>>> order
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           processing using
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> group
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water
>> >> mark
>> >> > > > > tracking.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded
>> >> > growth
>> >> > > is
>> >> > > > > > just
>> >> > > > > > >>>>> scary
>> >> > > > > > >>>>>>> + It
>> >> > > > > > >>>>>>>>>>>> saves
>> >> > > > > > >>>>>>>>>>>>>>>>>>> us
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           the header
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I think the
>> abstraction
>> >> of
>> >> > > > always
>> >> > > > > > >>>>>>> repartitioning
>> >> > > > > > >>>>>>>>>>>> back is
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           just not so
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work
>> has
>> >> > been
>> >> > > > > done
>> >> > > > > > >>>>> before
>> >> > > > > > >>>>>>> we
>> >> > > > > > >>>>>>>>>>>> partition
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           back and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by something else
>> >> afterwards
>> >> > > is
>> >> > > > > > really
>> >> > > > > > >>>>>>> common.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49,
>> Adam
>> >> > > > > Bellemare
>> >> > > > > > >>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your
>> >> > feedback,
>> >> > > I
>> >> > > > do
>> >> > > > > > >>>>>>> appreciate
>> >> > > > > > >>>>>>>>> it!
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing
>> >> would be
>> >> > > > > > possible,
>> >> > > > > > >> it
>> >> > > > > > >>>>>>> would
>> >> > > > > > >>>>>>>>>>>> require
>> >> > > > > > >>>>>>>>>>>>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > deserialize
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
>> >> implies
>> >> > a
>> >> > > > > > runtime
>> >> > > > > > >>>>>>> overhead.
>> >> > > > > > >>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > no
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
>> >> avoid
>> >> > > the
>> >> > > > > > >>>>> overhead.
>> >> > > > > > >>>>>>> If
>> >> > > > > > >>>>>>>>> this
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > problem in
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
>> >> still
>> >> > add
>> >> > > > > name
>> >> > > > > > >>>>> spacing
>> >> > > > > > >>>>>>>>> later
>> >> > > > > > >>>>>>>>>>>> on.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go
>> with
>> >> > > using a
>> >> > > > > > >> reserved
>> >> > > > > > >>>>>>> string
>> >> > > > > > >>>>>>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           document it.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about
>> >> the
>> >> > > > design
>> >> > > > > it
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> type of
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > If
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the
>> proposal
>> >> > > > > correctly,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you
>> >> have
>> >> > > > table1
>> >> > > > > > and
>> >> > > > > > >>>>> table2
>> >> > > > > > >>>>>>>>>>>> swapped.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           Here is how it
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> works
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the
>> >> records
>> >> > > that
>> >> > > > > > contain
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>>>>> foreign key
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           within their
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream:
>> >> > > > > > <a,(fk=A,bar=1)>,
>> >> > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream:
>> >> <A,X>,
>> >> > > > <B,Y>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is
>> >> > required
>> >> > > > to
>> >> > > > > > >> extract
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>> foreign
>> >> > > > > > >>>>>>>>>>>>>>>>>>> key.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key
>> >> mapper:
>> >> > (
>> >> > > > > value
>> >> > > > > > =>
>> >> > > > > > >>>>>>> value.fk
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <http://value.fk> )
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is
>> applied to
>> >> > each
>> >> > > > > > element
>> >> > > > > > >> in
>> >> > > > > > >>>>>>>>> table1,
>> >> > > > > > >>>>>>>>>>>> and a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           new combined
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a,
>> >> > > > > (fk=A,bar=1)>,
>> >> > > > > > >>>>> <A-b,
>> >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           <B-c,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events
>> >> are
>> >> > > > > > >> copartitioned
>> >> > > > > > >>>>>>> with
>> >> > > > > > >>>>>>>>>>>> table2:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with
>> >> > > Partition
>> >> > > > > 0:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
>> >> <A-a,
>> >> > > > > > >>>>> (fk=A,bar=1)>,
>> >> > > > > > >>>>>>> <A-b,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with
>> >> > > Partition
>> >> > > > > 1:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
>> >> <B-c,
>> >> > > > > > >> (fk=B,bar=3)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they
>> can
>> >> be
>> >> > > > joined
>> >> > > > > > >>>>> together
>> >> > > > > > >>>>>>>>> locally
>> >> > > > > > >>>>>>>>>>>> by
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           applying the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's
>> >> design
>> >> > > and
>> >> > > > > my
>> >> > > > > > >>>>> design
>> >> > > > > > >>>>>>>>>>>> deviate. My
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           design goes
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > on
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data
>> >> > > post-join
>> >> > > > > and
>> >> > > > > > >>>>> resolve
>> >> > > > > > >>>>>>>>>>>> out-of-order
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           arrival of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the
>> >> data
>> >> > > > keyed
>> >> > > > > > just
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>>>>>>> original key.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           I do not
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of
>> >> the
>> >> > > > > internals
>> >> > > > > > >>>>>>> outside of
>> >> > > > > > >>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           joinOnForeignKey
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does
>> make
>> >> > for
>> >> > > > > larger
>> >> > > > > > >>>>>>> footprint,
>> >> > > > > > >>>>>>>>>>>> but it
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           removes all
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving
>> >> out-of-order
>> >> > > > > arrivals
>> >> > > > > > >> and
>> >> > > > > > >>>>>>>>> handling
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           CombinedKeys from
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that
>> >> this
>> >> > > makes
>> >> > > > > the
>> >> > > > > > >>>>>>> function
>> >> > > > > > >>>>>>>>> much
>> >> > > > > > >>>>>>>>>>>>>>>>>>> easier
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           to use.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this
>> >> helps
>> >> > > > resolve
>> >> > > > > > your
>> >> > > > > > >>>>>>>>> questions,
>> >> > > > > > >>>>>>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           please feel
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on
>> >> your
>> >> > > mind.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018
>> at
>> >> 8:36
>> >> > > PM,
>> >> > > > > > >>>>> Matthias J.
>> >> > > > > > >>>>>>>>> Sax <
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> matth...@confluent.io
>> >> > > <mailto:
>> >> > > > > > >>>>>>>>>>>> matth...@confluent.io>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching
>> up
>> >> on
>> >> > > this
>> >> > > > > > >> thread. I
>> >> > > > > > >>>>>>> did
>> >> > > > > > >>>>>>>>> not
>> >> > > > > > >>>>>>>>>>>> read
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           everything so
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share
>> >> couple
>> >> > of
>> >> > > > > > initial
>> >> > > > > > >>>>>>> thoughts:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think
>> there
>> >> is
>> >> > a
>> >> > > > > > >> fundamental
>> >> > > > > > >>>>>>>>>>>> difference
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           between header
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and
>> KP-258.
>> >> > For
>> >> > > > 258,
>> >> > > > > > we
>> >> > > > > > >> add
>> >> > > > > > >>>>>>>>> headers
>> >> > > > > > >>>>>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka
>> >> Streams
>> >> > > and
>> >> > > > > > nobody
>> >> > > > > > >>>>>>> else is
>> >> > > > > > >>>>>>>>>>>> supposed
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           to write
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > into
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no
>> user
>> >> > > header
>> >> > > > > are
>> >> > > > > > >>>>> written
>> >> > > > > > >>>>>>> into
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not
>> >> > > conflicts.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I
>> don't
>> >> see
>> >> > a
>> >> > > > big
>> >> > > > > > issue
>> >> > > > > > >>>>> with
>> >> > > > > > >>>>>>>>> using
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           headers within
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we
>> document
>> >> it,
>> >> > > we
>> >> > > > > can
>> >> > > > > > >> have
>> >> > > > > > >>>>>>> some
>> >> > > > > > >>>>>>>>>>>> "reserved"
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           header keys
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not
>> allowed
>> >> to
>> >> > use
>> >> > > > > when
>> >> > > > > > >>>>>>> processing
>> >> > > > > > >>>>>>>>>>>> data with
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           Kafka
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Streams.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be
>> >> ok.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a
>> safe
>> >> > way
>> >> > > to
>> >> > > > > > avoid
>> >> > > > > > >>>>>>>>> conflicts,
>> >> > > > > > >>>>>>>>>>>> since
>> >> > > > > > >>>>>>>>>>>>>>>>>>> these
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > headers
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in
>> >> internal
>> >> > > > topics
>> >> > > > > (I
>> >> > > > > > >>>>> think):
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and
>> >> > changelog
>> >> > > > > > topics,
>> >> > > > > > >> we
>> >> > > > > > >>>>> can
>> >> > > > > > >>>>>>>>>>>> namespace
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           all headers:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined
>> headers
>> >> > are
>> >> > > > > > >> namespaced
>> >> > > > > > >>>>> as
>> >> > > > > > >>>>>>>>>>>> "external."
>> >> > > > > > >>>>>>>>>>>>>>>>>>> +
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers
>> are
>> >> > > > > > namespaced as
>> >> > > > > > >>>>>>>>>>>> "internal." +
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing
>> >> would
>> >> > be
>> >> > > > > > >> possible,
>> >> > > > > > >>>>> it
>> >> > > > > > >>>>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>>>> require
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
>> >> implies
>> >> > a
>> >> > > > > > runtime
>> >> > > > > > >>>>>>> overhead.
>> >> > > > > > >>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > no
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
>> >> avoid
>> >> > > the
>> >> > > > > > >>>>> overhead.
>> >> > > > > > >>>>>>> If
>> >> > > > > > >>>>>>>>> this
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > problem in
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
>> >> still
>> >> > add
>> >> > > > > name
>> >> > > > > > >>>>> spacing
>> >> > > > > > >>>>>>>>> later
>> >> > > > > > >>>>>>>>>>>> on.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern
>> about
>> >> the
>> >> > > > design
>> >> > > > > > it
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> type
>> >> > > > > > >>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the
>> proposal
>> >> > > > > correctly,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1>
>> table1 =
>> >> ...
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2>
>> table2 =
>> >> ...
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3>
>> >> joinedTable
>> >> > =
>> >> > > > > > >>>>>>>>>>>> table1.join(table2,...);
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the
>> >> > > `joinedTable`
>> >> > > > > has
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> same
>> >> > > > > > >>>>>>>>> key
>> >> > > > > > >>>>>>>>>>>> as the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           left input
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not
>> >> work
>> >> > > > because
>> >> > > > > > if
>> >> > > > > > >>>>> table2
>> >> > > > > > >>>>>>>>>>>> contains
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           multiple rows
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record
>> in
>> >> > table1
>> >> > > > > > (what is
>> >> > > > > > >>>>> the
>> >> > > > > > >>>>>>> main
>> >> > > > > > >>>>>>>>>>>> purpose
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > foreign
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result
>> table
>> >> > > would
>> >> > > > > only
>> >> > > > > > >>>>>>> contain a
>> >> > > > > > >>>>>>>>>>>> single
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           join result,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > but
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream:
>> >> <A,X>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream:
>> >> > > > <a,(A,1)>,
>> >> > > > > > >>>>> <b,(A,2)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value
>> a
>> >> > > foreign
>> >> > > > > key
>> >> > > > > > to
>> >> > > > > > >>>>>>> table1
>> >> > > > > > >>>>>>>>> key
>> >> > > > > > >>>>>>>>>>>> (ie,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           "A" joins).
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > If
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the
>> same
>> >> key
>> >> > > as
>> >> > > > > key
>> >> > > > > > of
>> >> > > > > > >>>>>>> table1,
>> >> > > > > > >>>>>>>>> this
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           implies that the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be
>> >> <A,
>> >> > > > > > join(X,1)>
>> >> > > > > > >> or
>> >> > > > > > >>>>> <A,
>> >> > > > > > >>>>>>>>>>>> join(X,2)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           but not
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > both.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share
>> the
>> >> same
>> >> > > > key,
>> >> > > > > > >>>>> whatever
>> >> > > > > > >>>>>>>>> result
>> >> > > > > > >>>>>>>>>>>> record
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           we emit
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > later,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the
>> previous
>> >> > > result.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason
>> why
>> >> Jan
>> >> > > > > > originally
>> >> > > > > > >>>>>>> proposed
>> >> > > > > > >>>>>>>>>>>> to use
>> >> > > > > > >>>>>>>>>>>>>>>>>>> a
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > combination
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of
>> >> the
>> >> > > input
>> >> > > > > > tables
>> >> > > > > > >>>>> as
>> >> > > > > > >>>>>>> key
>> >> > > > > > >>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           output table.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of
>> the
>> >> > output
>> >> > > > > table
>> >> > > > > > >>>>> unique
>> >> > > > > > >>>>>>> and
>> >> > > > > > >>>>>>>>> we
>> >> > > > > > >>>>>>>>>>>> can
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           store both in
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be
>> <A-a,
>> >> > > > > join(X,1)>,
>> >> > > > > > >> <A-b,
>> >> > > > > > >>>>>>>>>>>> join(X,2)>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM,
>> Jan
>> >> > > > Filipiak
>> >> > > > > > >> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark
>> >> could
>> >> > be
>> >> > > > > > >>>>> disregarded.
>> >> > > > > > >>>>>>> The
>> >> > > > > > >>>>>>>>>>>> decision
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           about the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the
>> size of
>> >> > the
>> >> > > > > > >> aggregated
>> >> > > > > > >>>>>>> map.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long
>> >> maps
>> >> > > > would
>> >> > > > > be
>> >> > > > > > >>>>>>> unpacked
>> >> > > > > > >>>>>>>>> and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           forwarded. 0
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > element
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published
>> as
>> >> > > delete.
>> >> > > > > Any
>> >> > > > > > >>>>> other
>> >> > > > > > >>>>>>> count
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is
>> in
>> >> > > "waiting
>> >> > > > > for
>> >> > > > > > >>>>> correct
>> >> > > > > > >>>>>>>>>>>> deletes to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > arrive"-state.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018
>> 21:29,
>> >> Adam
>> >> > > > > > Bellemare
>> >> > > > > > >>>>>>> wrote:
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I
>> >> could
>> >> > > > > replace
>> >> > > > > > >> the
>> >> > > > > > >>>>>>> second
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           repartition store
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > and
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store
>> with
>> >> a
>> >> > > > groupBy
>> >> > > > > > and
>> >> > > > > > >>>>>>> reduce.
>> >> > > > > > >>>>>>>>>>>> However,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           it looks
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > like
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to
>> store
>> >> the
>> >> > > > > > highwater
>> >> > > > > > >>>>> value
>> >> > > > > > >>>>>>>>> within
>> >> > > > > > >>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           materialized
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the
>> arrival of
>> >> > > > > > out-of-order
>> >> > > > > > >>>>>>> records
>> >> > > > > > >>>>>>>>>>>> (assuming
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>> my
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is
>> correct...).
>> >> This
>> >> > > in
>> >> > > > > > effect
>> >> > > > > > >> is
>> >> > > > > > >>>>>>> the
>> >> > > > > > >>>>>>>>> same
>> >> > > > > > >>>>>>>>>>>> as
>> >> > > > > > >>>>>>>>>>>>>>>>>>> the
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           design I
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two
>> >> tables
>> >> > > > merged
>> >> > > > > > >>>>> together.
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>> --
>> >> > > > > > >>>>>>>>>>>>>>>>> -- Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>> --
>> >> > > > > > >>>>>>>>>>>>>>>> -- Guozhang
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>>
>> >> > > > > > >>>>>>>>>>
>> >> > > > > > >>>>>>>>>
>> >> > > > > > >>>>>>>>
>> >> > > > > > >>>>>>>
>> >> > > > > > >>>>>>
>> >> > > > > > >>>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >> >
>> >> > --
>> >> > -- Guozhang
>> >> >
>> >>
>> >
>>
>> --
>> -- Guozhang
>>
>

Reply via email to