Hi John & Guozhang

@John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the KIP,
pruned much of what I wrote and put a simplified diagram near the top to
illustrate the workflow. I encapsulated Jan's content at the bottom of the
document. I believe it is simpler to read by far now.

@Guozhang Wang <wangg...@gmail.com>:
> #1: rekey left table
>   -> source from the left upstream, send to rekey-processor to generate
combined key, and then sink to copartition topic.
Correct.

> #2: first-join with right table
>   -> source from the right table upstream, materialize the right table.
>   -> source from the co-partition topic, materialize the rekeyed left
table, join with the right table, rekey back, and then sink to the
rekeyed-back topic.
Almost - I cleared up the KIP. We do not rekey back yet, as I need the
Foreign-Key value generated in #1 above to compare in the resolution stage.

> #3: second join
>    -> source from the rekeyed-back topic, materialize the rekeyed back
table.
>   -> source from the left upstream, materialize the left table, join with
the rekeyed back table.
Almost - As each event comes in, we just run it through a stateful
processor that checks the original ("This") KTable for the key. The value
payload then has the foreignKeyExtractor applied again as in Part #1 above,
and gets the current foreign key. Then we compare it to the joined event
that we are currently resolving. If they have the same foreign-key,
propagate the result out. If they don't, throw the event away.

The end result is that we do need to materialize 2 additional tables
(left/this-combinedkey table, and the final Joined table) as I've
illustrated in the updated KIP. I hope the diagram clears it up a lot
better. Please let me know.

Thanks again
Adam




On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote:

> John,
>
> Thanks a lot for the suggestions on refactoring the wiki, I agree with you
> that we should consider the KIP proposal to be easily understood by anyone
> in the future to read, and hence should provide a good summary on the
> user-facing interfaces, as well as rejected alternatives to represent
> briefly "how we came a long way to this conclusion, and what we have
> argued, disagreed, and agreed about, etc" so that readers do not need to
> dig into the DISCUSS thread to get all the details. We can, of course, keep
> the implementation details like "workflows" on the wiki page as a addendum
> section since it also has correlations.
>
> Regarding your proposal on comment 6): that's a very interesting idea! Just
> to clarify that I understands it fully correctly: the proposal's resulted
> topology is still the same as the current proposal, where we will have 3
> sub-topologies for this operator:
>
> #1: rekey left table
>    -> source from the left upstream, send to rekey-processor to generate
> combined key, and then sink to copartition topic.
>
> #2: first-join with right table
>    -> source from the right table upstream, materialize the right table.
>    -> source from the co-partition topic, materialize the rekeyed left
> table, join with the right table, rekey back, and then sink to the
> rekeyed-back topic.
>
> #3: second join
>    -> source from the rekeyed-back topic, materialize the rekeyed back
> table.
>    -> source from the left upstream, materialize the left table, join with
> the rekeyed back table.
>
> Sub-topology #1 and #3 may be merged to a single sub-topology since both of
> them read from the left table source stream. In this workflow, we need to
> materialize 4 tables (left table in #3, right table in #2, rekeyed left
> table in #2, rekeyed-back table in #3), and 2 repartition topics
> (copartition topic, rekeyed-back topic).
>
> Compared with Adam's current proposal in the workflow overview, it has the
> same num.materialize tables (left table, rekeyed left table, right table,
> out-of-ordering resolver table), and same num.internal topics (two). The
> advantage is that on the copartition topic, we can save bandwidth by not
> sending value, and in #2 the rekeyed left table is smaller since we do not
> have any values to materialize. Is that right?
>
>
> Guozhang
>
>
>
> On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> wrote:
>
> > Hi Adam,
> >
> > Given that the committers are all pretty busy right now, I think that it
> > would help if you were to refactor the KIP a little to reduce the
> workload
> > for reviewers.
> >
> > I'd recommend the following changes:
> > * relocate all internal details to a section at the end called something
> > like "Implementation Notes" or something like that.
> > * rewrite the rest of the KIP to be a succinct as possible and mention
> only
> > publicly-facing API changes.
> > ** for example, the interface that you've already listed there, as well
> as
> > a textual description of the guarantees we'll be providing (join result
> is
> > copartitioned with the LHS, and the join result is guaranteed correct)
> >
> > A good target would be that the whole main body of the KIP, including
> > Status, Motivation, Proposal, Justification, and Rejected Alternatives
> all
> > fit "above the fold" (i.e., all fit on the screen at a comfortable zoom
> > level).
> > I think the only real Rejected Alternative that bears mention at this
> point
> > is KScatteredTable, which you could just include the executive summary on
> > (no implementation details), and link to extra details in the
> > Implementation Notes section.
> >
> > Taking a look at the wiki page, ~90% of the text there is internal
> detail,
> > which is useful for the dubious, but doesn't need to be ratified in a
> vote
> > (and would be subject to change without notice in the future anyway).
> > There's also a lot of conflicting discussion, as you've very respectfully
> > tried to preserve the original proposal from Jan while adding your own.
> > Isolating all this information in a dedicated section at the bottom frees
> > the voters up to focus on the public API part of the proposal, which is
> > really all they need to consider.
> >
> > Plus, it'll be clear to future readers which parts of the document are
> > enduring, and which parts are a snapshot of our implementation thinking
> at
> > the time.
> >
> > I'm suggesting this because I suspect that the others haven't made time
> to
> > review it partly because it seems daunting. If it seems like it would be
> a
> > huge time investment to review, people will just keep putting it off. But
> > if the KIP is a single page, then they'll be more inclined to give it a
> > read.
> >
> > Honestly, I don't think the KIP itself is that controversial (apart from
> > the scattered table thing (sorry, Jan) ). Most of the discussion has been
> > around the implementation, which we can continue more effectively in a PR
> > once the KIP has passed.
> >
> > How does that sound?
> > -John
> >
> > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <adam.bellem...@gmail.com
> >
> > wrote:
> >
> > > 1) I believe that the resolution mechanism John has proposed is
> > sufficient
> > > - it is clean and easy and doesn't require additional RocksDB stores,
> > which
> > > reduces the footprint greatly. I don't think we need to resolve based
> on
> > > timestamp or offset anymore, but if we decide to do to that would be
> > within
> > > the bounds of the existing API.
> > >
> > > 2) Is the current API sufficient, or does it need to be altered to go
> > back
> > > to vote?
> > >
> > > 3) KScatteredTable implementation can always be added in a future
> > revision.
> > > This API does not rule it out. This implementation of this function
> would
> > > simply be replaced with `KScatteredTable.resolve()` while still
> > maintaining
> > > the existing API, thereby giving both features as Jan outlined earlier.
> > > Would this work?
> > >
> > >
> > > Thanks Guozhang, John and Jan
> > >
> > >
> > >
> > >
> > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Hi, all,
> > > >
> > > > >> In fact, we
> > > > >> can just keep a single final-result store with timestamps and
> reject
> > > > values
> > > > >> that have a smaller timestamp, is that right?
> > > >
> > > > > Which is the correct output should at least be decided on the
> offset
> > of
> > > > > the original message.
> > > >
> > > > Thanks for this point, Jan.
> > > >
> > > > KIP-258 is merely to allow embedding the record timestamp  in the k/v
> > > > store,
> > > > as well as providing a storage-format upgrade path.
> > > >
> > > > I might have missed it, but I think we have yet to discuss whether
> it's
> > > > safe
> > > > or desirable just to swap topic-ordering our for timestamp-ordering.
> > This
> > > > is
> > > > a very deep topic, and I think it would only pollute the current
> > > > discussion.
> > > >
> > > > What Adam has proposed is safe, given the *current* ordering
> semantics
> > > > of the system. If we can agree on his proposal, I think we can merge
> > the
> > > > feature well before the conversation about timestamp ordering even
> > takes
> > > > place, much less reaches a conclusion. In the mean time, it would
> seem
> > to
> > > > be unfortunate to have one join operator with different ordering
> > > semantics
> > > > from every other KTable operator.
> > > >
> > > > If and when that timestamp discussion takes place, many (all?) KTable
> > > > operations
> > > > will need to be updated, rendering the many:one join a small marginal
> > > cost.
> > > >
> > > > And, just to plug it again, I proposed an algorithm above that I
> > believe
> > > > provides
> > > > correct ordering without any additional metadata, and regardless of
> the
> > > > ordering semantics. I didn't bring it up further, because I felt the
> > KIP
> > > > only needs
> > > > to agree on the public API, and we can discuss the implementation at
> > > > leisure in
> > > > a PR...
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > >
> > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
> jan.filip...@trivago.com
> > >
> > > > wrote:
> > > >
> > > > >
> > > > >
> > > > > On 10.12.2018 07:42, Guozhang Wang wrote:
> > > > > > Hello Adam / Jan / John,
> > > > > >
> > > > > > Sorry for being late on this thread! I've finally got some time
> > this
> > > > > > weekend to cleanup a load of tasks on my queue (actually I've
> also
> > > > > realized
> > > > > > there are a bunch of other things I need to enqueue while
> cleaning
> > > them
> > > > > up
> > > > > > --- sth I need to improve on my side). So here are my thoughts:
> > > > > >
> > > > > > Regarding the APIs: I like the current written API in the KIP.
> More
> > > > > > generally I'd prefer to keep the 1) one-to-many join
> > functionalities
> > > as
> > > > > > well as 2) other join types than inner as separate KIPs since 1)
> > may
> > > > > worth
> > > > > > a general API refactoring that can benefit not only foreignkey
> > joins
> > > > but
> > > > > > collocate joins as well (e.g. an extended proposal of
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > > > ),
> > > > > > and I'm not sure if other join types would actually be needed
> > (maybe
> > > > left
> > > > > > join still makes sense), so it's better to
> > > > wait-for-people-to-ask-and-add
> > > > > > than add-sth-that-no-one-uses.
> > > > > >
> > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a
> > > > > > KScatteredTable for users to inject their own optimization: I'd
> > > prefer
> > > > to
> > > > > > do the current option as-is, and my main rationale is for
> > > optimization
> > > > > > rooms inside the Streams internals and the API succinctness. For
> > > > advanced
> > > > > > users who may indeed prefer KScatteredTable and do their own
> > > > > optimization,
> > > > > > while it is too much of the work to use Processor API directly, I
> > > think
> > > > > we
> > > > > > can still extend the current API to support it in the future if
> it
> > > > > becomes
> > > > > > necessary.
> > > > >
> > > > > no internal optimization potential. it's a myth
> > > > >
> > > > > ¯\_(ツ)_/¯
> > > > >
> > > > > :-)
> > > > >
> > > > > >
> > > > > > Another note about step 4) resolving out-of-ordering data, as I
> > > > mentioned
> > > > > > before I think with KIP-258 (embedded timestamp with key-value
> > store)
> > > > we
> > > > > > can actually make this step simpler than the current proposal. In
> > > fact,
> > > > > we
> > > > > > can just keep a single final-result store with timestamps and
> > reject
> > > > > values
> > > > > > that have a smaller timestamp, is that right?
> > > > >
> > > > > Which is the correct output should at least be decided on the
> offset
> > of
> > > > > the original message.
> > > > >
> > > > > >
> > > > > >
> > > > > > That's all I have in mind now. Again, great appreciation to Adam
> to
> > > > make
> > > > > > such HUGE progress on this KIP!
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> > > jan.filip...@trivago.com>
> > > > > > wrote:
> > > > > >
> > > > > >> If they don't find the time:
> > > > > >> They usually take the opposite path from me :D
> > > > > >> so the answer would be clear.
> > > > > >>
> > > > > >> hence my suggestion to vote.
> > > > > >>
> > > > > >>
> > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote:
> > > > > >>> Hi Guozhang and Matthias
> > > > > >>>
> > > > > >>> I know both of you are quite busy, but we've gotten this KIP
> to a
> > > > point
> > > > > >>> where we need more guidance on the API (perhaps a bit of a
> > > > tie-breaker,
> > > > > >> if
> > > > > >>> you will). If you have anyone else you may think should look at
> > > this,
> > > > > >>> please tag them accordingly.
> > > > > >>>
> > > > > >>> The scenario is as such:
> > > > > >>>
> > > > > >>> Current Option:
> > > > > >>> API:
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the
> > partition
> > > > with
> > > > > >> the
> > > > > >>> foreignKey (repartition 1)
> > > > > >>> 2) Join the data
> > > > > >>> 3) Shuffle the data back to the original node (repartition 2)
> > > > > >>> 4) Resolve out-of-order arrival / race condition due to
> > foreign-key
> > > > > >> changes.
> > > > > >>>
> > > > > >>> Alternate Option:
> > > > > >>> Perform #1 and #2 above, and return a KScatteredTable.
> > > > > >>> - It would be keyed on a wrapped key function: <CombinedKey<KO,
> > K>,
> > > > VR>
> > > > > >> (KO
> > > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result)
> > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but
> > otherwise a
> > > > > user
> > > > > >>> would be able to perform additional functions directly from the
> > > > > >>> KScatteredTable (TBD - currently out of scope).
> > > > > >>> - John's analysis 2-emails up is accurate as to the tradeoffs.
> > > > > >>>
> > > > > >>> Current Option is coded as-is. Alternate option is possible,
> but
> > > will
> > > > > >>> require for implementation details to be made in the API and
> some
> > > > > >> exposure
> > > > > >>> of new data structures into the API (ie: CombinedKey).
> > > > > >>>
> > > > > >>> I appreciate any insight into this.
> > > > > >>>
> > > > > >>> Thanks.
> > > > > >>>
> > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> > > > > adam.bellem...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Hi John
> > > > > >>>>
> > > > > >>>> Thanks for your feedback and assistance. I think your summary
> is
> > > > > >> accurate
> > > > > >>>> from my perspective. Additionally, I would like to add that
> > there
> > > > is a
> > > > > >> risk
> > > > > >>>> of inconsistent final states without performing the
> resolution.
> > > This
> > > > > is
> > > > > >> a
> > > > > >>>> major concern for me as most of the data I have dealt with is
> > > > produced
> > > > > >> by
> > > > > >>>> relational databases. We have seen a number of cases where a
> > user
> > > in
> > > > > the
> > > > > >>>> Rails UI has modified the field (foreign key), realized they
> > made
> > > a
> > > > > >>>> mistake, and then updated the field again with a new key. The
> > > events
> > > > > are
> > > > > >>>> propagated out as they are produced, and as such we have had
> > > > > real-world
> > > > > >>>> cases where these inconsistencies were propagated downstream
> as
> > > the
> > > > > >> final
> > > > > >>>> values due to the race conditions in the fanout of the data.
> > > > > >>>>
> > > > > >>>> This solution that I propose values correctness of the final
> > > result
> > > > > over
> > > > > >>>> other factors.
> > > > > >>>>
> > > > > >>>> We could always move this function over to using a
> > KScatteredTable
> > > > > >>>> implementation in the future, and simply deprecate it this
> join
> > > API
> > > > in
> > > > > >>>> time. I think I would like to hear more from some of the other
> > > major
> > > > > >>>> committers on which course of action they would think is best
> > > before
> > > > > any
> > > > > >>>> more coding is done.
> > > > > >>>>
> > > > > >>>> Thanks again
> > > > > >>>>
> > > > > >>>> Adam
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
> j...@confluent.io>
> > > > > wrote:
> > > > > >>>>
> > > > > >>>>> Hi Jan and Adam,
> > > > > >>>>>
> > > > > >>>>> Wow, thanks for doing that test, Adam. Those results are
> > > > encouraging.
> > > > > >>>>>
> > > > > >>>>> Thanks for your performance experience as well, Jan. I agree
> > that
> > > > > >> avoiding
> > > > > >>>>> unnecessary join outputs is especially important when the
> > fan-out
> > > > is
> > > > > so
> > > > > >>>>> high. I suppose this could also be built into the
> > implementation
> > > > > we're
> > > > > >>>>> discussing, but it wouldn't have to be specified in the KIP
> > > (since
> > > > > >> it's an
> > > > > >>>>> API-transparent optimization).
> > > > > >>>>>
> > > > > >>>>> As far as whether or not to re-repartition the data, I didn't
> > > bring
> > > > > it
> > > > > >> up
> > > > > >>>>> because it sounded like the two of you agreed to leave the
> KIP
> > > > as-is,
> > > > > >>>>> despite the disagreement.
> > > > > >>>>>
> > > > > >>>>> If you want my opinion, I feel like both approaches are
> > > reasonable.
> > > > > >>>>> It sounds like Jan values more the potential for developers
> to
> > > > > optimize
> > > > > >>>>> their topologies to re-use the intermediate nodes, whereas
> Adam
> > > > > places
> > > > > >>>>> more
> > > > > >>>>> value on having a single operator that people can use without
> > > extra
> > > > > >> steps
> > > > > >>>>> at the end.
> > > > > >>>>>
> > > > > >>>>> Personally, although I do find it exceptionally annoying
> when a
> > > > > >> framework
> > > > > >>>>> gets in my way when I'm trying to optimize something, it
> seems
> > > > better
> > > > > >> to
> > > > > >>>>> go
> > > > > >>>>> for a single operation.
> > > > > >>>>> * Encapsulating the internal transitions gives us significant
> > > > > latitude
> > > > > >> in
> > > > > >>>>> the implementation (for example, joining only at the end, not
> > in
> > > > the
> > > > > >>>>> middle
> > > > > >>>>> to avoid extra data copying and out-of-order resolution; how
> we
> > > > > >> represent
> > > > > >>>>> the first repartition keys (combined keys vs. value vectors),
> > > > etc.).
> > > > > >> If we
> > > > > >>>>> publish something like a KScatteredTable with the
> > > right-partitioned
> > > > > >> joined
> > > > > >>>>> data, then the API pretty much locks in the implementation as
> > > well.
> > > > > >>>>> * The API seems simpler to understand and use. I do mean
> > "seems";
> > > > if
> > > > > >>>>> anyone
> > > > > >>>>> wants to make the case that KScatteredTable is actually
> > simpler,
> > > I
> > > > > >> think
> > > > > >>>>> hypothetical usage code would help. From a relational algebra
> > > > > >> perspective,
> > > > > >>>>> it seems like KTable.join(KTable) should produce a new KTable
> > in
> > > > all
> > > > > >>>>> cases.
> > > > > >>>>> * That said, there might still be room in the API for a
> > different
> > > > > >>>>> operation
> > > > > >>>>> like what Jan has proposed to scatter a KTable, and then do
> > > things
> > > > > like
> > > > > >>>>> join, re-group, etc from there... I'm not sure; I haven't
> > thought
> > > > > >> through
> > > > > >>>>> all the consequences yet.
> > > > > >>>>>
> > > > > >>>>> This is all just my opinion after thinking over the
> discussion
> > so
> > > > > >> far...
> > > > > >>>>> -John
> > > > > >>>>>
> > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> > > > > >> adam.bellem...@gmail.com>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Updated the PR to take into account John's feedback.
> > > > > >>>>>>
> > > > > >>>>>> I did some preliminary testing for the performance of the
> > > > > prefixScan.
> > > > > >> I
> > > > > >>>>>> have attached the file, but I will also include the text in
> > the
> > > > body
> > > > > >>>>> here
> > > > > >>>>>> for archival purposes (I am not sure what happens to
> attached
> > > > > files).
> > > > > >> I
> > > > > >>>>>> also updated the PR and the KIP accordingly.
> > > > > >>>>>>
> > > > > >>>>>> Summary: It scales exceptionally well for scanning large
> > values
> > > of
> > > > > >>>>>> records. As Jan mentioned previously, the real issue would
> be
> > > more
> > > > > >>>>> around
> > > > > >>>>>> processing the resulting records after obtaining them. For
> > > > instance,
> > > > > >> it
> > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and a
> > further
> > > > > >>>>> ~35-85mS
> > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of them.
> > > > Iterating
> > > > > >>>>>> through the records just to generate a simple count takes ~
> 40
> > > > times
> > > > > >>>>> longer
> > > > > >>>>>> than the flush + scan combined.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> ============================================================================================
> > > > > >>>>>> Setup:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> ============================================================================================
> > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap
> > (Xmx512m,
> > > > > >> Xms512m)
> > > > > >>>>>> CPU: i7 2.2 Ghz.
> > > > > >>>>>>
> > > > > >>>>>> Note: I am using a slightly-modified, directly-accessible
> > Kafka
> > > > > >> Streams
> > > > > >>>>>> RocksDB
> > > > > >>>>>> implementation (RocksDB.java, basically just avoiding the
> > > > > >>>>>> ProcessorContext).
> > > > > >>>>>> There are no modifications to the default RocksDB values
> > > provided
> > > > in
> > > > > >> the
> > > > > >>>>>> 2.1/trunk release.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> keysize = 128 bytes
> > > > > >>>>>> valsize = 512 bytes
> > > > > >>>>>>
> > > > > >>>>>> Step 1:
> > > > > >>>>>> Write X positive matching events: (key = prefix +
> left-padded
> > > > > >>>>>> auto-incrementing integer)
> > > > > >>>>>> Step 2:
> > > > > >>>>>> Write 10X negative matching events (key = left-padded
> > > > > >> auto-incrementing
> > > > > >>>>>> integer)
> > > > > >>>>>> Step 3:
> > > > > >>>>>> Perform flush
> > > > > >>>>>> Step 4:
> > > > > >>>>>> Perform prefixScan
> > > > > >>>>>> Step 5:
> > > > > >>>>>> Iterate through return Iterator and validate the count of
> > > expected
> > > > > >>>>> events.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> ============================================================================================
> > > > > >>>>>> Results:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> ============================================================================================
> > > > > >>>>>> X = 1k (11k events total)
> > > > > >>>>>> Flush Time = 39 mS
> > > > > >>>>>> Scan Time = 7 mS
> > > > > >>>>>> 6.9 MB disk
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > > >>>>>> X = 10k (110k events total)
> > > > > >>>>>> Flush Time = 45 mS
> > > > > >>>>>> Scan Time = 8 mS
> > > > > >>>>>> 127 MB
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > > >>>>>> X = 100k (1.1M events total)
> > > > > >>>>>> Test1:
> > > > > >>>>>> Flush Time = 60 mS
> > > > > >>>>>> Scan Time = 12 mS
> > > > > >>>>>> 678 MB
> > > > > >>>>>>
> > > > > >>>>>> Test2:
> > > > > >>>>>> Flush Time = 45 mS
> > > > > >>>>>> Scan Time = 7 mS
> > > > > >>>>>> 576 MB
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > > >>>>>> X = 1MB (11M events total)
> > > > > >>>>>> Test1:
> > > > > >>>>>> Flush Time = 52 mS
> > > > > >>>>>> Scan Time = 19 mS
> > > > > >>>>>> 7.2 GB
> > > > > >>>>>>
> > > > > >>>>>> Test2:
> > > > > >>>>>> Flush Time = 84 mS
> > > > > >>>>>> Scan Time = 34 mS
> > > > > >>>>>> 9.1 GB
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > > >>>>>> X = 2.5M (27.5M events total)
> > > > > >>>>>> Test1:
> > > > > >>>>>> Flush Time = 82 mS
> > > > > >>>>>> Scan Time = 63 mS
> > > > > >>>>>> 17GB - 276 sst files
> > > > > >>>>>>
> > > > > >>>>>> Test2:
> > > > > >>>>>> Flush Time = 116 mS
> > > > > >>>>>> Scan Time = 35 mS
> > > > > >>>>>> 23GB - 361 sst files
> > > > > >>>>>>
> > > > > >>>>>> Test3:
> > > > > >>>>>> Flush Time = 103 mS
> > > > > >>>>>> Scan Time = 82 mS
> > > > > >>>>>> 19 GB - 300 sst files
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > > >>>>>>
> > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M events. I
> > > tried
> > > > > to
> > > > > >> go
> > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the
> 100GB+
> > > > range
> > > > > >>>>> and my
> > > > > >>>>>> laptop ran out of disk. More extensive testing could be done
> > > but I
> > > > > >>>>> suspect
> > > > > >>>>>> that it would be in line with what we're seeing in the
> results
> > > > > above.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> At this point in time, I think the only major discussion
> point
> > > is
> > > > > >> really
> > > > > >>>>>> around what Jan and I have disagreed on: repartitioning
> back +
> > > > > >> resolving
> > > > > >>>>>> potential out of order issues or leaving that up to the
> client
> > > to
> > > > > >>>>> handle.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Thanks folks,
> > > > > >>>>>>
> > > > > >>>>>> Adam
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> > > > > jan.filip...@trivago.com
> > > > > >>>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> > > > > >>>>>>>> Hi all,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Sorry that this discussion petered out... I think the 2.1
> > > > release
> > > > > >>>>>>> caused an
> > > > > >>>>>>>> extended distraction that pushed it off everyone's radar
> > > (which
> > > > > was
> > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had some
> > > extend
> > > > > >>>>>>>> distractions of my own that kept (and continue to keep) me
> > > > > >>>>> preoccupied.
> > > > > >>>>>>>>
> > > > > >>>>>>>> However, calling for a vote did wake me up, so I guess Jan
> > was
> > > > on
> > > > > >> the
> > > > > >>>>>>> right
> > > > > >>>>>>>> track!
> > > > > >>>>>>>>
> > > > > >>>>>>>> I've gone back and reviewed the whole KIP document and the
> > > prior
> > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts:
> > > > > >>>>>>>>
> > > > > >>>>>>>> API Thoughts:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a
> many-to-one
> > > > join.
> > > > > >>>>> Could
> > > > > >>>>>>> we
> > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip
> > the
> > > > > design
> > > > > >>>>>>> around
> > > > > >>>>>>>> and make it a oneToManyJoin?
> > > > > >>>>>>>>
> > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the join
> > type,
> > > > and
> > > > > it
> > > > > >>>>>>> seems
> > > > > >>>>>>>> like it might trick some people into using it for a
> > one-to-one
> > > > > join.
> > > > > >>>>>>> This
> > > > > >>>>>>>> would work, of course, but it would be super inefficient
> > > > compared
> > > > > to
> > > > > >>>>> a
> > > > > >>>>>>>> simple rekey-and-join.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 2. I might have missed it, but I don't think it's
> specified
> > > > > whether
> > > > > >>>>>>> it's an
> > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer join, as
> > > > > >>>>> (neglecting
> > > > > >>>>>>> IQ),
> > > > > >>>>>>>> the rest can be achieved by filtering or by handling it in
> > the
> > > > > >>>>>>> ValueJoiner.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite
> > right.
> > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different
> > paradigms
> > > in
> > > > > >>>>> play in
> > > > > >>>>>>>> the Streams API, so it's confusing, but instead of three
> > > > > Serialized
> > > > > >>>>>>> args, I
> > > > > >>>>>>>> think it would be better to have one that allows
> > (optionally)
> > > > > >> setting
> > > > > >>>>>>> the 4
> > > > > >>>>>>>> incoming serdes. The result serde is defined by the
> > > > Materialized.
> > > > > >> The
> > > > > >>>>>>>> incoming serdes can be optional because they might already
> > be
> > > > > >>>>> available
> > > > > >>>>>>> on
> > > > > >>>>>>>> the source KTables, or the default serdes from the config
> > > might
> > > > be
> > > > > >>>>>>>> applicable.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins
> > don't
> > > > > allow
> > > > > >>>>>>> setting
> > > > > >>>>>>>> one, and it seems like it might actually be harmful, since
> > the
> > > > > rekey
> > > > > >>>>>>>> operation needs to produce results that are co-partitioned
> > > with
> > > > > the
> > > > > >>>>>>> "other"
> > > > > >>>>>>>> KTable.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I didn't
> > > > actually
> > > > > >>>>>>> follow
> > > > > >>>>>>>> what Matthias meant about namespacing requiring
> > > "deserializing"
> > > > > the
> > > > > >>>>>>> record
> > > > > >>>>>>>> header. The headers are already Strings, so I don't think
> > that
> > > > > >>>>>>>> deserialization is required. If we applied the namespace
> at
> > > > source
> > > > > >>>>> nodes
> > > > > >>>>>>>> and stripped it at sink nodes, this would be practically
> no
> > > > > >> overhead.
> > > > > >>>>>>> The
> > > > > >>>>>>>> advantage of the namespace idea is that no public API
> change
> > > wrt
> > > > > >>>>> headers
> > > > > >>>>>>>> needs to happen, and no restrictions need to be placed on
> > > users'
> > > > > >>>>>>> headers.
> > > > > >>>>>>>>
> > > > > >>>>>>>> (Although I'm wondering if we can get away without the
> > header
> > > at
> > > > > >>>>> all...
> > > > > >>>>>>>> stay tuned)
> > > > > >>>>>>>>
> > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM table
> > > > growing
> > > > > >>>>>>> without
> > > > > >>>>>>>> bound. As I read it, the HWM table is effectively
> > implementing
> > > > OCC
> > > > > >> to
> > > > > >>>>>>>> resolve the problem you noted with disordering when the
> > rekey
> > > is
> > > > > >>>>>>>> reversed... particularly notable when the FK changes. As
> > such,
> > > > it
> > > > > >>>>> only
> > > > > >>>>>>>> needs to track the most recent "version" (the offset in
> the
> > > > source
> > > > > >>>>>>>> partition) of each key. Therefore, it should have the same
> > > > number
> > > > > of
> > > > > >>>>>>> keys
> > > > > >>>>>>>> as the source table at all times.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I see that you are aware of KIP-258, which I think might
> be
> > > > > relevant
> > > > > >>>>> in
> > > > > >>>>>>> a
> > > > > >>>>>>>> couple of ways. One: it's just about storing the timestamp
> > in
> > > > the
> > > > > >>>>> state
> > > > > >>>>>>>> store, but the ultimate idea is to effectively use the
> > > timestamp
> > > > > as
> > > > > >>>>> an
> > > > > >>>>>>> OCC
> > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't want to
> > use
> > > > the
> > > > > >>>>>>>> timestamp for this operation, but if you were to use a
> > similar
> > > > > >>>>>>> mechanism to
> > > > > >>>>>>>> store the source offset in the store alongside the
> re-keyed
> > > > > values,
> > > > > >>>>> then
> > > > > >>>>>>>> you could avoid a separate table.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 6. You and Jan have been thinking about this for a long
> > time,
> > > so
> > > > > >> I've
> > > > > >>>>>>>> probably missed something here, but I'm wondering if we
> can
> > > > avoid
> > > > > >> the
> > > > > >>>>>>> HWM
> > > > > >>>>>>>> tracking at all and resolve out-of-order during a final
> join
> > > > > >>>>> instead...
> > > > > >>>>>>>>
> > > > > >>>>>>>> Let's say we're joining a left table (Integer K: Letter
> FK,
> > > > (other
> > > > > >>>>>>> data))
> > > > > >>>>>>>> to a right table (Letter K: (some data)).
> > > > > >>>>>>>>
> > > > > >>>>>>>> Left table:
> > > > > >>>>>>>> 1: (A, xyz)
> > > > > >>>>>>>> 2: (B, asd)
> > > > > >>>>>>>>
> > > > > >>>>>>>> Right table:
> > > > > >>>>>>>> A: EntityA
> > > > > >>>>>>>> B: EntityB
> > > > > >>>>>>>>
> > > > > >>>>>>>> We could do a rekey as you proposed with a combined key,
> but
> > > not
> > > > > >>>>>>>> propagating the value at all..
> > > > > >>>>>>>> Rekey table:
> > > > > >>>>>>>> A-1: (dummy value)
> > > > > >>>>>>>> B-2: (dummy value)
> > > > > >>>>>>>>
> > > > > >>>>>>>> Which we then join with the right table to produce:
> > > > > >>>>>>>> A-1: EntityA
> > > > > >>>>>>>> B-2: EntityB
> > > > > >>>>>>>>
> > > > > >>>>>>>> Which gets rekeyed back:
> > > > > >>>>>>>> 1: A, EntityA
> > > > > >>>>>>>> 2: B, EntityB
> > > > > >>>>>>>>
> > > > > >>>>>>>> And finally we do the actual join:
> > > > > >>>>>>>> Result table:
> > > > > >>>>>>>> 1: ((A, xyz), EntityA)
> > > > > >>>>>>>> 2: ((B, asd), EntityB)
> > > > > >>>>>>>>
> > > > > >>>>>>>> The thing is that in that last join, we have the
> opportunity
> > > to
> > > > > >>>>> compare
> > > > > >>>>>>> the
> > > > > >>>>>>>> current FK in the left table with the incoming PK of the
> > right
> > > > > >>>>> table. If
> > > > > >>>>>>>> they don't match, we just drop the event, since it must be
> > > > > outdated.
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets
> > > > updated
> > > > > >> to
> > > > > >>>>>>> (1:
> > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about whether the
> > > final
> > > > > >>>>> state
> > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the
> algorithm
> > > > above,
> > > > > >>>>> you
> > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B,
> > xyz),
> > > > (B,
> > > > > >>>>>>>> EntityB)). It seems like this does give you enough
> > information
> > > > to
> > > > > >>>>> make
> > > > > >>>>>>> the
> > > > > >>>>>>>> right choice, regardless of disordering.
> > > > > >>>>>>>
> > > > > >>>>>>> Will check Adams patch, but this should work. As mentioned
> > > often
> > > > I
> > > > > am
> > > > > >>>>>>> not convinced on partitioning back for the user
> > automatically.
> > > I
> > > > > >> think
> > > > > >>>>>>> this is the real performance eater ;)
> > > > > >>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the
> > > performance
> > > > of
> > > > > >>>>> the
> > > > > >>>>>>>> range scans when records change in the right table. You've
> > > said
> > > > > that
> > > > > >>>>>>> you've
> > > > > >>>>>>>> been using the algorithm you presented in production for a
> > > > while.
> > > > > >> Can
> > > > > >>>>>>> you
> > > > > >>>>>>>> give us a sense of the performance characteristics you've
> > > > > observed?
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Make it work, make it fast, make it beautiful. The topmost
> > > thing
> > > > > here
> > > > > >>>>> is
> > > > > >>>>>>> / was correctness. In practice I do not measure the
> > performance
> > > > of
> > > > > >> the
> > > > > >>>>>>> range scan. Usual cases I run this with is emitting 500k -
> > 1kk
> > > > rows
> > > > > >>>>>>> on a left hand side change. The range scan is just the work
> > you
> > > > > gotta
> > > > > >>>>>>> do, also when you pack your data into different formats,
> > > usually
> > > > > the
> > > > > >>>>>>> rocks performance is very tight to the size of the data and
> > we
> > > > > can't
> > > > > >>>>>>> really change that. It is more important for users to
> prevent
> > > > > useless
> > > > > >>>>>>> updates to begin with. My left hand side is guarded to drop
> > > > changes
> > > > > >>>>> that
> > > > > >>>>>>> are not going to change my join output.
> > > > > >>>>>>>
> > > > > >>>>>>> usually it's:
> > > > > >>>>>>>
> > > > > >>>>>>> drop unused fields and then don't forward if
> old.equals(new)
> > > > > >>>>>>>
> > > > > >>>>>>> regarding to the performance of creating an iterator for
> > > smaller
> > > > > >>>>>>> fanouts, users can still just do a group by first then
> > anyways.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>> I could only think of one alternative, but I'm not sure if
> > > it's
> > > > > >>>>> better
> > > > > >>>>>>> or
> > > > > >>>>>>>> worse... If the first re-key only needs to preserve the
> > > original
> > > > > >> key,
> > > > > >>>>>>> as I
> > > > > >>>>>>>> proposed in #6, then we could store a vector of keys in
> the
> > > > value:
> > > > > >>>>>>>>
> > > > > >>>>>>>> Left table:
> > > > > >>>>>>>> 1: A,...
> > > > > >>>>>>>> 2: B,...
> > > > > >>>>>>>> 3: A,...
> > > > > >>>>>>>>
> > > > > >>>>>>>> Gets re-keyed:
> > > > > >>>>>>>> A: [1, 3]
> > > > > >>>>>>>> B: [2]
> > > > > >>>>>>>>
> > > > > >>>>>>>> Then, the rhs part of the join would only need a regular
> > > > > single-key
> > > > > >>>>>>> lookup.
> > > > > >>>>>>>> Of course we have to deal with the problem of large
> values,
> > as
> > > > > >>>>> there's
> > > > > >>>>>>> no
> > > > > >>>>>>>> bound on the number of lhs records that can reference rhs
> > > > records.
> > > > > >>>>>>> Offhand,
> > > > > >>>>>>>> I'd say we could page the values, so when one row is past
> > the
> > > > > >>>>>>> threshold, we
> > > > > >>>>>>>> append the key for the next page. Then in most cases, it
> > would
> > > > be
> > > > > a
> > > > > >>>>>>> single
> > > > > >>>>>>>> key lookup, but for large fan-out updates, it would be one
> > per
> > > > > (max
> > > > > >>>>>>> value
> > > > > >>>>>>>> size)/(avg lhs key size).
> > > > > >>>>>>>>
> > > > > >>>>>>>> This seems more complex, though... Plus, I think there's
> > some
> > > > > extra
> > > > > >>>>>>>> tracking we'd need to do to know when to emit a
> retraction.
> > > For
> > > > > >>>>> example,
> > > > > >>>>>>>> when record 1 is deleted, the re-key table would just have
> > (A:
> > > > > [3]).
> > > > > >>>>>>> Some
> > > > > >>>>>>>> kind of tombstone is needed so that the join result for 1
> > can
> > > > also
> > > > > >> be
> > > > > >>>>>>>> retracted.
> > > > > >>>>>>>>
> > > > > >>>>>>>> That's all!
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful
> KIP.
> > > > Sorry
> > > > > >> the
> > > > > >>>>>>>> discussion has been slow.
> > > > > >>>>>>>> -John
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> > > > > >>>>> jan.filip...@trivago.com>
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Id say you can just call the vote.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> that happens all the time, and if something comes up, it
> > just
> > > > > goes
> > > > > >>>>> back
> > > > > >>>>>>>>> to discuss.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> would not expect to much attention with another another
> > email
> > > > in
> > > > > >>>>> this
> > > > > >>>>>>>>> thread.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> best Jan
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> > > > > >>>>>>>>>> Hello Contributors
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do need
> to
> > > bump
> > > > > >>>>> this to
> > > > > >>>>>>>>> keep
> > > > > >>>>>>>>>> visibility up. I am still intending to push this through
> > > once
> > > > > >>>>>>> contributor
> > > > > >>>>>>>>>> feedback is given.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Main points that need addressing:
> > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current
> > singular
> > > > > graph
> > > > > >>>>> node
> > > > > >>>>>>>>> into
> > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right
> > now. I
> > > > am
> > > > > a
> > > > > >>>>> bit
> > > > > >>>>>>>>> fuzzy
> > > > > >>>>>>>>>> on how the optimizations are supposed to work, so I
> would
> > > > > >>>>> appreciate
> > > > > >>>>>>> any
> > > > > >>>>>>>>>> help on this aspect.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This thread
> > has
> > > > > much
> > > > > >>>>>>>>> discourse
> > > > > >>>>>>>>>> between Jan and I between the current highwater mark
> > > proposal
> > > > > and
> > > > > >> a
> > > > > >>>>>>>>> groupBy
> > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need to
> > > > strictly
> > > > > >>>>> handle
> > > > > >>>>>>>>> any
> > > > > >>>>>>>>>> chance of out-of-order data and leave none of it up to
> the
> > > > > >>>>> consumer.
> > > > > >>>>>>> Any
> > > > > >>>>>>>>>> comments or suggestions here would also help.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3) Anything else that you see that would prevent this
> from
> > > > > moving
> > > > > >>>>> to a
> > > > > >>>>>>>>> vote?
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Adam
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> > > > > >>>>>>>>> adam.bellem...@gmail.com>
> > > > > >>>>>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Hi Jan
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and
> > > > > >>>>> Stores.persistentWindowStore,
> > > > > >>>>>>> you
> > > > > >>>>>>>>>>> actually only need to specify the amount of segments
> you
> > > want
> > > > > and
> > > > > >>>>> how
> > > > > >>>>>>>>> large
> > > > > >>>>>>>>>>> they are. To the best of my understanding, what happens
> > is
> > > > that
> > > > > >>>>> the
> > > > > >>>>>>>>>>> segments are automatically rolled over as new data with
> > new
> > > > > >>>>>>> timestamps
> > > > > >>>>>>>>> are
> > > > > >>>>>>>>>>> created. We use this exact functionality in some of the
> > > work
> > > > > done
> > > > > >>>>>>>>>>> internally at my company. For reference, this is the
> > > hopping
> > > > > >>>>> windowed
> > > > > >>>>>>>>> store.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> In the code that I have provided, there are going to be
> > two
> > > > 24h
> > > > > >>>>>>>>> segments.
> > > > > >>>>>>>>>>> When a record is put into the windowStore, it will be
> > > > inserted
> > > > > at
> > > > > >>>>>>> time
> > > > > >>>>>>>>> T in
> > > > > >>>>>>>>>>> both segments. The two segments will always overlap by
> > 12h.
> > > > As
> > > > > >>>>> time
> > > > > >>>>>>>>> goes on
> > > > > >>>>>>>>>>> and new records are added (say at time T+12h+), the
> > oldest
> > > > > >> segment
> > > > > >>>>>>> will
> > > > > >>>>>>>>> be
> > > > > >>>>>>>>>>> automatically deleted and a new segment created. The
> > > records
> > > > > are
> > > > > >>>>> by
> > > > > >>>>>>>>> default
> > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that it is
> > the
> > > > > record
> > > > > >>>>>>> time,
> > > > > >>>>>>>>> not
> > > > > >>>>>>>>>>> the clock time, which is used.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> To the best of my understanding, the timestamps are
> > > retained
> > > > > when
> > > > > >>>>>>>>>>> restoring from the changelog.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL
> at a
> > > > > >>>>>>> segment-level,
> > > > > >>>>>>>>>>> instead of at an individual record level.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> > > > > >>>>>>> jan.filip...@trivago.com>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with
> > > > > ClassCastException
> > > > > >>>>> or
> > > > > >>>>>>>>>>>> similar.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> You either would have to specify the window you
> > fetch/put
> > > or
> > > > > >>>>> iterate
> > > > > >>>>>>>>>>>> across all windows the key was found in right?
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> I just hope the window-store doesn't check stream-time
> > > under
> > > > > the
> > > > > >>>>>>> hoods
> > > > > >>>>>>>>>>>> that would be a questionable interface.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> If it does: did you see my comment on checking all the
> > > > windows
> > > > > >>>>>>> earlier?
> > > > > >>>>>>>>>>>> that would be needed to actually give reasonable time
> > > > > gurantees.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Best
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> > > > > >>>>>>>>>>>>> Hi Jan
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only changed
> > the
> > > > > state
> > > > > >>>>>>> store,
> > > > > >>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>> the ProcessorSupplier.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>> Adam
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> @Guozhang
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed
> something
> > > that
> > > > > >>>>> will be
> > > > > >>>>>>>>>>>>>>> extremely
> > > > > >>>>>>>>>>>>>>> useful for this KIP.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> @Jan
> > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I
> will
> > > not
> > > > > be
> > > > > >>>>>>> moving
> > > > > >>>>>>>>>>>> ahead
> > > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy
> > solution
> > > > as
> > > > > >> you
> > > > > >>>>>>>>>>>> propose.
> > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it
> yourself
> > > off
> > > > > of
> > > > > >>>>> my
> > > > > >>>>>>>>>>>> current PR
> > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I would
> > be
> > > > more
> > > > > >>>>> than
> > > > > >>>>>>>>>>>> happy to
> > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it
> stands
> > > > right
> > > > > >>>>> now,
> > > > > >>>>>>> I do
> > > > > >>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>> really have more time to invest into alternatives
> > > without
> > > > > >>>>> there
> > > > > >>>>>>>>> being
> > > > > >>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>> strong indication from the binding voters which
> they
> > > > would
> > > > > >>>>>>> prefer.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave up
> on
> > > the
> > > > > >>>>> streams
> > > > > >>>>>>>>> DSL
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>> some time already, otherwise I would have pulled
> this
> > > KIP
> > > > > >>>>> through
> > > > > >>>>>>>>>>>> already.
> > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on
> > PAPI.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the windowed
> > > state
> > > > > >>>>> store
> > > > > >>>>>>> in
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>>>>> next
> > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I
> will
> > > come
> > > > > >> back
> > > > > >>>>>>> for
> > > > > >>>>>>>>>>>> final
> > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of
> the
> > > > > binding
> > > > > >>>>>>> voters
> > > > > >>>>>>>>>>>> could
> > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated
> it
> > > > > >>>>> according
> > > > > >>>>>>> to
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>>>>> latest plan:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed
> > store.
> > > > > This
> > > > > >>>>>>> could
> > > > > >>>>>>>>> be
> > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they
> are
> > > > > >>>>> completed.
> > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks,
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Adam
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
> already
> > > > > updated
> > > > > >>>>> in
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> PR?
> > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing
> > > > something?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
> > > > > >>>>>>> wangg...@gmail.com>
> > > > > >>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the
> > > wrong
> > > > > >> link,
> > > > > >>>>>>> as it
> > > > > >>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of
> > > > KIP-258
> > > > > >>>>> we do
> > > > > >>>>>>>>>>>> want to
> > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source
> KTable"
> > > such
> > > > > >> that
> > > > > >>>>>>>>>>>> instead of
> > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized
> store,
> > > > i.e.
> > > > > >>>>>>> following
> > > > > >>>>>>>>>>>>>>>> offset
> > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are older
> than
> > > the
> > > > > >>>>> current
> > > > > >>>>>>>>>>>> key's
> > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Guozhang
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
> > > > > >>>>>>>>> wangg...@gmail.com>
> > > > > >>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hello Adam,
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final
> > step
> > > > > (i.e.
> > > > > >>>>> the
> > > > > >>>>>>>>> high
> > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced with
> a
> > > > window
> > > > > >>>>>>> store),
> > > > > >>>>>>>>> I
> > > > > >>>>>>>>>>>>>>>>> think
> > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually help:
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >>>>>>>>>>>>>>>>>
> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value
> > > store
> > > > > >>>>> (i.e.
> > > > > >>>>>>> only
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage,
> as
> > > > > >>>>> described
> > > > > >>>>>>> in
> > > > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533
> ,
> > is
> > > > > that
> > > > > >>>>> we
> > > > > >>>>>>> can
> > > > > >>>>>>>>>>>> then
> > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its
> > > > timestamp
> > > > > is
> > > > > >>>>>>>>> smaller
> > > > > >>>>>>>>>>>> than
> > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I
> think
> > it
> > > > is
> > > > > >>>>> very
> > > > > >>>>>>>>>>>> similar to
> > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark based
> > > > filtering,
> > > > > >>>>> while
> > > > > >>>>>>>>> you
> > > > > >>>>>>>>>>>> only
> > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the
> > joining
> > > > > >> records
> > > > > >>>>>>> are
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> correctly
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the final
> > > stage.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and
> hence
> > > > > >>>>>>> non-windowed
> > > > > >>>>>>>>>>>> KTables
> > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not really
> > have
> > > a
> > > > > good
> > > > > >>>>>>>>> support
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>>> their joins anyways (
> > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
> > > > > >>>>>>>>>>>>>>>>> I
> > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed
> > KTable-KTable
> > > > > >>>>> non-key
> > > > > >>>>>>>>> joins
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Guozhang
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
> > > > > >>>>>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would
> grow
> > > > > >> endlessly
> > > > > >>>>>>> based
> > > > > >>>>>>>>>>>> on
> > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of
> > > (<this
> > > > > >>>>> table
> > > > > >>>>>>>>>>>> primary
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> key>,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used
> > to
> > > > > >>>>>>> differentiate
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> between
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal
> > > would
> > > > > be
> > > > > >>>>> to
> > > > > >>>>>>>>>>>> replace
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This
> > would
> > > > > allow
> > > > > >>>>> the
> > > > > >>>>>>>>> same
> > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This
> > > > should
> > > > > >>>>> allow
> > > > > >>>>>>> for
> > > > > >>>>>>>>>>>> all
> > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
> should
> > be
> > > > > >>>>>>> customizable
> > > > > >>>>>>>>>>>> by
> > > > > >>>>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps
> > just
> > > > 10
> > > > > >>>>>>> minutes
> > > > > >>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> window,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...).
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the
> > > trick
> > > > > >> here.
> > > > > >>>>>>> Even
> > > > > >>>>>>>>>>>> if I
> > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic
> > repartitioning
> > > > > >>>>> optional
> > > > > >>>>>>>>>>>> since I
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a
> > > little
> > > > > bit
> > > > > >>>>>>>>>>>> sceptical
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> about
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially one
> > could
> > > > run
> > > > > >>>>> into
> > > > > >>>>>>>>>>>> problems
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> when
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window border. I
> > will
> > > > > check
> > > > > >>>>> you
> > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we
> > > could
> > > > > >>>>> still
> > > > > >>>>>>>>> check
> > > > > >>>>>>>>>>>>>>>>>> _all_
> > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance
> > impact I
> > > > > >> guess.
> > > > > >>>>>>> Will
> > > > > >>>>>>>>>>>> let
> > > > > >>>>>>>>>>>>>>>>>> you
> > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct as
> > is. I
> > > > > >>>>> wouldn't
> > > > > >>>>>>> not
> > > > > >>>>>>>>>>>> like
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> > timestamp(A)  <
> > > > > >>>>>>>>> timestamp(B).
> > > > > >>>>>>>>>>>> I
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> think
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> we can't expect that.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> @Jan
> > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now -
> thanks
> > > for
> > > > > the
> > > > > >>>>>>>>>>>> diagram, it
> > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do not
> > have
> > > > the
> > > > > >>>>>>> original
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> primary
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was
> > available
> > > > > then
> > > > > >>>>> you
> > > > > >>>>>>>>>>>> would be
> > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map.
> That
> > > > being
> > > > > >>>>> said,
> > > > > >>>>>>> I
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> encourage
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for
> > clarity
> > > > for
> > > > > >>>>>>> everyone
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> else.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard
> > > work.
> > > > > But
> > > > > >>>>> I
> > > > > >>>>>>>>>>>> understand
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> > original
> > > > > >> primary
> > > > > >>>>>>> key,
> > > > > >>>>>>>>> We
> > > > > >>>>>>>>>>>>>>>>>> have
> > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI
> and
> > > > > >> basically
> > > > > >>>>>>> not
> > > > > >>>>>>>>>>>> using
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> any
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed
> that
> > in
> > > > > >>>>> original
> > > > > >>>>>>> DSL
> > > > > >>>>>>>>>>>> its
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on
> > my
> > > > end.
> > > > > >>>>> Will
> > > > > >>>>>>>>>>>> finish
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map
> > stay
> > > > > >> inside
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> State
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes
> have
> > > > > >>>>> propagated?
> > > > > >>>>>>>>> Isn't
> > > > > >>>>>>>>>>>>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state
> > > store?
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor
> is
> > > > gonna
> > > > > >>>>>>> return
> > > > > >>>>>>>>>>>> `null`
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But there
> is
> > > > going
> > > > > to
> > > > > >>>>> be
> > > > > >>>>>>> a
> > > > > >>>>>>>>>>>> store
> > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this
> store
> > > > > directly
> > > > > >>>>> for
> > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a
> > > regular
> > > > > >>>>> store,
> > > > > >>>>>>>>>>>> satisfying
> > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join.
> > The
> > > > > >>>>> Windowed
> > > > > >>>>>>>>>>>> store is
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull
> > > operation
> > > > > we
> > > > > >>>>>>> would
> > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have
> the
> > > > > window
> > > > > >>>>>>> store
> > > > > >>>>>>>>>>>> also
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> have
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the values then.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom
> group
> > > by
> > > > > >>>>> before
> > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i
> think
> > > it
> > > > > >> would
> > > > > >>>>>>> help
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> users
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the
> > > original
> > > > > >>>>> primary
> > > > > >>>>>>>>> key
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> issue I
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid foundation
> > to
> > > > > build
> > > > > >>>>> on.
> > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user.
> very
> > > > > >>>>>>> unfortunate. I
> > > > > >>>>>>>>>>>> could
> > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do not
> > > think
> > > > > its
> > > > > >>>>> a
> > > > > >>>>>>> good
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> decision.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Thanks
> > > > > >>>>>>>>>>>>>>>>>>> Adam
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta
> > Dumbre <
> > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
> > > > > >>>>>>> dumbreprajakta...@gmail.com
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           please remove me from this group
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM Jan
> > > > Filipiak
> > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           wrote:
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > Hi Adam,
> > > > > >>>>>>>>>>>>>>>>>>>           >
> > > > > >>>>>>>>>>>>>>>>>>>           > give me some time, will make such a
> > > > chart.
> > > > > >> last
> > > > > >>>>>>> time i
> > > > > >>>>>>>>>>>> didn't
> > > > > >>>>>>>>>>>>>>>>>>>           get along
> > > > > >>>>>>>>>>>>>>>>>>>           > well with giphy and ruined all your
> > > > charts.
> > > > > >>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done today
> > > > > >>>>>>>>>>>>>>>>>>>           >
> > > > > >>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam Bellemare
> > > > wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > > Hi Jan
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > > I have included a diagram of
> what I
> > > > > >> attempted
> > > > > >>>>> on
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> KIP.
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           >
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> > > > > >>>>>>>>>>>>>>>>>>>           <
> > > > > >>>>>>>>>>>>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > > I attempted this back at the
> start
> > of
> > > > my
> > > > > own
> > > > > >>>>>>>>>>>> implementation
> > > > > >>>>>>>>>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>           this
> > > > > >>>>>>>>>>>>>>>>>>>           > > solution, and since I could not
> get
> > > it
> > > > to
> > > > > >>>>> work I
> > > > > >>>>>>> have
> > > > > >>>>>>>>>>>> since
> > > > > >>>>>>>>>>>>>>>>>>>           discarded the
> > > > > >>>>>>>>>>>>>>>>>>>           > > code. At this point in time, if
> you
> > > > wish
> > > > > to
> > > > > >>>>>>> continue
> > > > > >>>>>>>>>>>> pursuing
> > > > > >>>>>>>>>>>>>>>>>>>           for your
> > > > > >>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask that you
> > > please
> > > > > >>>>> create a
> > > > > >>>>>>>>>>>> diagram on
> > > > > >>>>>>>>>>>>>>>>>>>           the KIP
> > > > > >>>>>>>>>>>>>>>>>>>           > > carefully explaining your
> solution.
> > > > > Please
> > > > > >>>>> feel
> > > > > >>>>>>> free
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>>> use
> > > > > >>>>>>>>>>>>>>>>>>>           the image I
> > > > > >>>>>>>>>>>>>>>>>>>           > > just posted as a starting point.
> I
> > am
> > > > > having
> > > > > >>>>>>> trouble
> > > > > >>>>>>>>>>>>>>>>>>>           understanding your
> > > > > >>>>>>>>>>>>>>>>>>>           > > explanations but I think that a
> > > > carefully
> > > > > >>>>>>> constructed
> > > > > >>>>>>>>>>>> diagram
> > > > > >>>>>>>>>>>>>>>>>>>           will clear
> > > > > >>>>>>>>>>>>>>>>>>>           > up
> > > > > >>>>>>>>>>>>>>>>>>>           > > any misunderstandings.
> Alternately,
> > > > > please
> > > > > >>>>> post a
> > > > > >>>>>>>>>>>>>>>>>>>           comprehensive PR with
> > > > > >>>>>>>>>>>>>>>>>>>           > > your solution. I can only guess
> at
> > > what
> > > > > you
> > > > > >>>>>>> mean, and
> > > > > >>>>>>>>>>>> since I
> > > > > >>>>>>>>>>>>>>>>>>>           value my
> > > > > >>>>>>>>>>>>>>>>>>>           > own
> > > > > >>>>>>>>>>>>>>>>>>>           > > time as much as you value yours,
> I
> > > > > believe
> > > > > >> it
> > > > > >>>>> is
> > > > > >>>>>>> your
> > > > > >>>>>>>>>>>>>>>>>>>           responsibility to
> > > > > >>>>>>>>>>>>>>>>>>>           > > provide an implementation instead
> > of
> > > me
> > > > > >>>>> trying to
> > > > > >>>>>>>>> guess.
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > > Adam
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00 AM,
> Jan
> > > > > Filipiak
> > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > > wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > > >>>>>>>>>>>>>>>>>>>           > >> Hi James,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing
> interested.
> > > > kafka
> > > > > >>>>>>> streams at
> > > > > >>>>>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>>>>           point supports
> > > > > >>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as long as
> both
> > > > > streams
> > > > > >>>>> have
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> same
> > > > > >>>>>>>>>>>>>>>>>>> key.
> > > > > >>>>>>>>>>>>>>>>>>>           > >> Adam is currently implementing a
> > > join
> > > > > >> where a
> > > > > >>>>>>> KTable
> > > > > >>>>>>>>>>>> and a
> > > > > >>>>>>>>>>>>>>>>>>>           KTable can
> > > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > > >>>>>>>>>>>>>>>>>>>           > >> a one to many relation ship
> (1:n).
> > > We
> > > > > >> exploit
> > > > > >>>>>>> that
> > > > > >>>>>>>>>>>> rocksdb
> > > > > >>>>>>>>>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > >> datastore that keeps data sorted
> (At
> > > > least
> > > > > >>>>>>> exposes an
> > > > > >>>>>>>>>>>> API to
> > > > > >>>>>>>>>>>>>>>>>>>           access the
> > > > > >>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted
> fashion).
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >> I think the technical caveats
> are
> > > well
> > > > > >>>>>>> understood
> > > > > >>>>>>>>> now
> > > > > >>>>>>>>>>>> and we
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> are
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > basically
> > > > > >>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API
> Design
> > (
> > > > when
> > > > > >> Adam
> > > > > >>>>>>> sees
> > > > > >>>>>>>>> my
> > > > > >>>>>>>>>>>> newest
> > > > > >>>>>>>>>>>>>>>>>>>           message).
> > > > > >>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track record of
> > > > loosing
> > > > > >>>>> those
> > > > > >>>>>>> kinda
> > > > > >>>>>>>>>>>>>>>>>>>           arguments within
> > > > > >>>>>>>>>>>>>>>>>>>           > the
> > > > > >>>>>>>>>>>>>>>>>>>           > >> streams community and I have no
> > clue
> > > > > why.
> > > > > >> So
> > > > > >>>>> I
> > > > > >>>>>>>>>>>> literally
> > > > > >>>>>>>>>>>>>>>>>>>           can't wait for
> > > > > >>>>>>>>>>>>>>>>>>>           > you
> > > > > >>>>>>>>>>>>>>>>>>>           > >> to churn through this thread and
> > > give
> > > > > you
> > > > > >>>>>>> opinion on
> > > > > >>>>>>>>>>>> how we
> > > > > >>>>>>>>>>>>>>>>>>>           should
> > > > > >>>>>>>>>>>>>>>>>>>           > design
> > > > > >>>>>>>>>>>>>>>>>>>           > >> the return type of the
> > oneToManyJoin
> > > > and
> > > > > >> how
> > > > > >>>>>>> many
> > > > > >>>>>>>>>>>> power we
> > > > > >>>>>>>>>>>>>>>>>>>           want to give
> > > > > >>>>>>>>>>>>>>>>>>>           > to
> > > > > >>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity" (where
> > > > > simplicity
> > > > > >>>>> isn't
> > > > > >>>>>>>>>>>> really that
> > > > > >>>>>>>>>>>>>>>>>>>           as users
> > > > > >>>>>>>>>>>>>>>>>>>           > still
> > > > > >>>>>>>>>>>>>>>>>>>           > >> need to understand it I argue)
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >> waiting for you to join in on
> the
> > > > > >> discussion
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >> Best Jan
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James Kwan
> > > wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>> I am new to this group and I
> > found
> > > > this
> > > > > >>>>> subject
> > > > > >>>>>>>>>>>>>>>>>>>           interesting.  Sounds
> > > > > >>>>>>>>>>>>>>>>>>>           > like
> > > > > >>>>>>>>>>>>>>>>>>>           > >>> you guys want to implement a
> join
> > > > > table of
> > > > > >>>>> two
> > > > > >>>>>>>>>>>> streams? Is
> > > > > >>>>>>>>>>>>>>>>>>> there
> > > > > >>>>>>>>>>>>>>>>>>>           > somewhere
> > > > > >>>>>>>>>>>>>>>>>>>           > >>> I can see the original
> > requirement
> > > or
> > > > > >>>>> proposal?
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13 AM, Jan
> > > > > Filipiak
> > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17, Adam
> > > Bellemare
> > > > > >> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing using a
> > > > > Windowed
> > > > > >>>>> Store
> > > > > >>>>>>> to
> > > > > >>>>>>>>>>>> store the
> > > > > >>>>>>>>>>>>>>>>>>>           highwater
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> mark.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this
> should
> > > work
> > > > > >> fine,
> > > > > >>>>>>> with
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>> caveat
> > > > > >>>>>>>>>>>>>>>>>>>           being that
> > > > > >>>>>>>>>>>>>>>>>>>           > it
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> can
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> only resolve out-of-order
> > arrival
> > > > > for up
> > > > > >>>>> to
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> size of
> > > > > >>>>>>>>>>>>>>>>>>>           the window
> > > > > >>>>>>>>>>>>>>>>>>>           > (ie:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This would
> > remove
> > > > the
> > > > > >>>>>>> possibility
> > > > > >>>>>>>>>>>> of it
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> being
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > unbounded
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> in
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> size.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's
> > > suggestion, I
> > > > > >>>>> believe
> > > > > >>>>>>> this
> > > > > >>>>>>>>> is
> > > > > >>>>>>>>>>>> where
> > > > > >>>>>>>>>>>>>>>>>>>           we will
> > > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement.
> While I
> > > do
> > > > > not
> > > > > >>>>>>> disagree
> > > > > >>>>>>>>>>>> with your
> > > > > >>>>>>>>>>>>>>>>>>>           statement
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> about
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be additional
> > > joins
> > > > > done
> > > > > >>>>> in a
> > > > > >>>>>>>>>>>> real-world
> > > > > >>>>>>>>>>>>>>>>>>>           workflow, I
> > > > > >>>>>>>>>>>>>>>>>>>           > do
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> not
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> see how you can conclusively
> > deal
> > > > > with
> > > > > >>>>>>>>> out-of-order
> > > > > >>>>>>>>>>>>>>>>>>> arrival
> > > > > >>>>>>>>>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent
> joins. I
> > > > have
> > > > > >>>>>>> attempted
> > > > > >>>>>>>>> what
> > > > > >>>>>>>>>>>> I
> > > > > >>>>>>>>>>>>>>>>>>>           think you have
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a
> high-water,
> > > > using
> > > > > >>>>>>> groupBy and
> > > > > >>>>>>>>>>>> reduce)
> > > > > >>>>>>>>>>>>>>>>>>>           and found
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> that if
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes too
> > > > quickly,
> > > > > or
> > > > > >>>>> the
> > > > > >>>>>>> load
> > > > > >>>>>>>>> on
> > > > > >>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>>>           stream thread
> > > > > >>>>>>>>>>>>>>>>>>>           > is
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> too
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined messages
> will
> > > > arrive
> > > > > >>>>>>>>> out-of-order
> > > > > >>>>>>>>>>>> and be
> > > > > >>>>>>>>>>>>>>>>>>>           incorrectly
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that an
> > > > intermediate
> > > > > >>>>> event
> > > > > >>>>>>> is
> > > > > >>>>>>>>>>>>>>>>>>> represented
> > > > > >>>>>>>>>>>>>>>>>>>           as the
> > > > > >>>>>>>>>>>>>>>>>>>           > final
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> event.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light on
> your
> > > > > groupBy
> > > > > >>>>>>>>>>>> implementation.
> > > > > >>>>>>>>>>>>>>>>>>>           There must be
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion where it
> is,
> > I
> > > > > would
> > > > > >>>>> just
> > > > > >>>>>>> like
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           confirm. The idea
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it must be
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess up. I
> > would
> > > > > like
> > > > > >> to
> > > > > >>>>>>> clarify
> > > > > >>>>>>>>>>>> before
> > > > > >>>>>>>>>>>>>>>>>>>           we draw a
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the
> scattered
> > > > events
> > > > > >>>>> back to
> > > > > >>>>>>>>> their
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> original
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > >>>>> partitions is the only way I
> know
> > > how
> > > > > to
> > > > > >>>>>>>>> conclusively
> > > > > >>>>>>>>>>>> deal
> > > > > >>>>>>>>>>>>>>>>>>>           with
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in a
> given
> > > time
> > > > > >> frame,
> > > > > >>>>>>> and to
> > > > > >>>>>>>>>>>> ensure
> > > > > >>>>>>>>>>>>>>>>>>>           that the
> > > > > >>>>>>>>>>>>>>>>>>>           > data
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> is
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent with
> the
> > > > input
> > > > > >>>>> events.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code to
> share
> > > that
> > > > > >>>>>>> illustrates
> > > > > >>>>>>>>> your
> > > > > >>>>>>>>>>>>>>>>>>>           approach, I
> > > > > >>>>>>>>>>>>>>>>>>>           > would
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> be
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it would
> > remove
> > > > any
> > > > > >>>>>>>>>>>> misunderstandings
> > > > > >>>>>>>>>>>>>>>>>>>           that I may
> > > > > >>>>>>>>>>>>>>>>>>>           > have.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking for
> my
> > > > code.
> > > > > I
> > > > > >>>>> don't
> > > > > >>>>>>> have
> > > > > >>>>>>>>>>>>>>>>>>>           something easily
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> readable here as its bloated
> > with
> > > > > >>>>> OO-patterns.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> @Override
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K aggKey,
> V
> > > > > value, T
> > > > > >>>>>>>>> aggregate)
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      {
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V>
> > > > currentStateAsMap =
> > > > > >>>>>>>>>>>> asMap(aggregate);
> > > > > >>>>>>>>>>>>>>>>>>> <<
> > > > > >>>>>>>>>>>>>>>>>>>           imaginary
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
> > > > > >>>>> mapper.apply(value);
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << this is the
> > place
> > > > > where
> > > > > >>>>> people
> > > > > >>>>>>>>>>>> actually
> > > > > >>>>>>>>>>>>>>>>>>>           gonna have
> > > > > >>>>>>>>>>>>>>>>>>>           > issues
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and why you probably couldn't
> do
> > > it.
> > > > > we
> > > > > >>>>> would
> > > > > >>>>>>> need
> > > > > >>>>>>>>>>>> to find
> > > > > >>>>>>>>>>>>>>>>>>>           a solution
> > > > > >>>>>>>>>>>>>>>>>>>           > here.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that yet.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << we propagate
> the
> > > > > field in
> > > > > >>>>> the
> > > > > >>>>>>>>>>>> joiner, so
> > > > > >>>>>>>>>>>>>>>>>>>           that we can
> > > > > >>>>>>>>>>>>>>>>>>>           > pick
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate.
> Probably
> > > you
> > > > > have
> > > > > >>>>> not
> > > > > >>>>>>>>> thought
> > > > > >>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>           this in your
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> approach right?
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I am very open
> > to
> > > > > find a
> > > > > >>>>>>> generic
> > > > > >>>>>>>>>>>> solution
> > > > > >>>>>>>>>>>>>>>>>>>           here. In my
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is broken
> in
> > > > > >>>>>>>>> KTableImpl.GroupBy
> > > > > >>>>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>>>>>>>>           looses
> > > > > >>>>>>>>>>>>>>>>>>>           > the keys
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the
> aggregate
> > > > key.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I abstracted
> it
> > > away
> > > > > back
> > > > > >>>>>>> then way
> > > > > >>>>>>>>>>>> before
> > > > > >>>>>>>>>>>>>>>>>>> i
> > > > > >>>>>>>>>>>>>>>>>>> was
> > > > > >>>>>>>>>>>>>>>>>>>           > thinking
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That is
> why I
> > > > > didn't
> > > > > >>>>>>> realize
> > > > > >>>>>>>>> its
> > > > > >>>>>>>>>>>>>>>>>>>           significance here.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << Opinions?
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          for (V m : current)
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > currentStateAsMap.put(mapper.apply(m),
> > > > > >> m);
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > currentStateAsMap.put(toModifyKey,
> > > > > >> value);
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          else
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > currentStateAsMap.remove(toModifyKey);
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> if(currentStateAsMap.isEmpty()){
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>                  return null;
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              }
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          retrun
> > > > > >>>>>>> asAggregateType(currentStateAsMap)
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      }
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Adam
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at 3:35
> PM,
> > > Jan
> > > > > >>>>> Filipiak
> > > > > >>>>>>> <
> > > > > >>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com <mailto:
> > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for bringing
> > Matthias
> > > > to
> > > > > >>>>> speed!
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> about the differences. I
> think
> > > > > >> re-keying
> > > > > >>>>>>> back
> > > > > >>>>>>>>>>>> should be
> > > > > >>>>>>>>>>>>>>>>>>>           optional at
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> best.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return a
> > > > > KScatteredTable
> > > > > >>>>> with
> > > > > >>>>>>>>>>>> reshuffle()
> > > > > >>>>>>>>>>>>>>>>>>>           returning
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> KTable<originalKey,Joined>
> to
> > > make
> > > > > the
> > > > > >>>>>>> backwards
> > > > > >>>>>>>>>>>>>>>>>>>           repartitioning
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big favour of
> > > doing
> > > > > the
> > > > > >>>>> out
> > > > > >>>>>>> of
> > > > > >>>>>>>>> order
> > > > > >>>>>>>>>>>>>>>>>>>           processing using
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> group
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water mark
> > > > tracking.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded
> growth
> > is
> > > > > just
> > > > > >>>>> scary
> > > > > >>>>>>> + It
> > > > > >>>>>>>>>>>> saves
> > > > > >>>>>>>>>>>>>>>>>>> us
> > > > > >>>>>>>>>>>>>>>>>>>           the header
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I think the abstraction of
> > > always
> > > > > >>>>>>> repartitioning
> > > > > >>>>>>>>>>>> back is
> > > > > >>>>>>>>>>>>>>>>>>>           just not so
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work has
> been
> > > > done
> > > > > >>>>> before
> > > > > >>>>>>> we
> > > > > >>>>>>>>>>>> partition
> > > > > >>>>>>>>>>>>>>>>>>>           back and
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by something else afterwards
> > is
> > > > > really
> > > > > >>>>>>> common.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49, Adam
> > > > Bellemare
> > > > > >>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your
> feedback,
> > I
> > > do
> > > > > >>>>>>> appreciate
> > > > > >>>>>>>>> it!
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing would be
> > > > > possible,
> > > > > >> it
> > > > > >>>>>>> would
> > > > > >>>>>>>>>>>> require
> > > > > >>>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           > deserialize
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies
> a
> > > > > runtime
> > > > > >>>>>>> overhead.
> > > > > >>>>>>>>> I
> > > > > >>>>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>>>>           suggest to
> > > > > >>>>>>>>>>>>>>>>>>>           > no
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid
> > the
> > > > > >>>>> overhead.
> > > > > >>>>>>> If
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> becomes a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > problem in
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still
> add
> > > > name
> > > > > >>>>> spacing
> > > > > >>>>>>>>> later
> > > > > >>>>>>>>>>>> on.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go with
> > using a
> > > > > >> reserved
> > > > > >>>>>>> string
> > > > > >>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>           document it.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about the
> > > design
> > > > it
> > > > > >> the
> > > > > >>>>>>> type of
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> > > > > >>>>>>>>>>>>>>>>>>>           > If
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> I
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the proposal
> > > > correctly,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you have
> > > table1
> > > > > and
> > > > > >>>>> table2
> > > > > >>>>>>>>>>>> swapped.
> > > > > >>>>>>>>>>>>>>>>>>>           Here is how it
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> works
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the records
> > that
> > > > > contain
> > > > > >>>>> the
> > > > > >>>>>>>>>>>> foreign key
> > > > > >>>>>>>>>>>>>>>>>>>           within their
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream:
> > > > > <a,(fk=A,bar=1)>,
> > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream: <A,X>,
> > > <B,Y>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is
> required
> > > to
> > > > > >> extract
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>> foreign
> > > > > >>>>>>>>>>>>>>>>>>> key.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key mapper:
> (
> > > > value
> > > > > =>
> > > > > >>>>>>> value.fk
> > > > > >>>>>>>>>>>>>>>>>>>           <http://value.fk> )
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is applied to
> each
> > > > > element
> > > > > >> in
> > > > > >>>>>>>>> table1,
> > > > > >>>>>>>>>>>> and a
> > > > > >>>>>>>>>>>>>>>>>>>           new combined
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a,
> > > > (fk=A,bar=1)>,
> > > > > >>>>> <A-b,
> > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> > > > > >>>>>>>>>>>>>>>>>>>           <B-c,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events are
> > > > > >> copartitioned
> > > > > >>>>>>> with
> > > > > >>>>>>>>>>>> table2:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with
> > Partition
> > > > 0:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <A-a,
> > > > > >>>>> (fk=A,bar=1)>,
> > > > > >>>>>>> <A-b,
> > > > > >>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with
> > Partition
> > > > 1:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <B-c,
> > > > > >> (fk=B,bar=3)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they can be
> > > joined
> > > > > >>>>> together
> > > > > >>>>>>>>> locally
> > > > > >>>>>>>>>>>> by
> > > > > >>>>>>>>>>>>>>>>>>>           applying the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's design
> > and
> > > > my
> > > > > >>>>> design
> > > > > >>>>>>>>>>>> deviate. My
> > > > > >>>>>>>>>>>>>>>>>>>           design goes
> > > > > >>>>>>>>>>>>>>>>>>>           > on
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data
> > post-join
> > > > and
> > > > > >>>>> resolve
> > > > > >>>>>>>>>>>> out-of-order
> > > > > >>>>>>>>>>>>>>>>>>>           arrival of
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the data
> > > keyed
> > > > > just
> > > > > >>>>> the
> > > > > >>>>>>>>>>>> original key.
> > > > > >>>>>>>>>>>>>>>>>>>           I do not
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of the
> > > > internals
> > > > > >>>>>>> outside of
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           joinOnForeignKey
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does make
> for
> > > > larger
> > > > > >>>>>>> footprint,
> > > > > >>>>>>>>>>>> but it
> > > > > >>>>>>>>>>>>>>>>>>>           removes all
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving out-of-order
> > > > arrivals
> > > > > >> and
> > > > > >>>>>>>>> handling
> > > > > >>>>>>>>>>>>>>>>>>>           CombinedKeys from
> > > > > >>>>>>>>>>>>>>>>>>>           > the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that this
> > makes
> > > > the
> > > > > >>>>>>> function
> > > > > >>>>>>>>> much
> > > > > >>>>>>>>>>>>>>>>>>> easier
> > > > > >>>>>>>>>>>>>>>>>>>           to use.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this helps
> > > resolve
> > > > > your
> > > > > >>>>>>>>> questions,
> > > > > >>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>           please feel
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on your
> > mind.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018 at 8:36
> > PM,
> > > > > >>>>> Matthias J.
> > > > > >>>>>>>>> Sax <
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> matth...@confluent.io
> > <mailto:
> > > > > >>>>>>>>>>>> matth...@confluent.io>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching up on
> > this
> > > > > >> thread. I
> > > > > >>>>>>> did
> > > > > >>>>>>>>> not
> > > > > >>>>>>>>>>>> read
> > > > > >>>>>>>>>>>>>>>>>>>           everything so
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share couple
> of
> > > > > initial
> > > > > >>>>>>> thoughts:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think there is
> a
> > > > > >> fundamental
> > > > > >>>>>>>>>>>> difference
> > > > > >>>>>>>>>>>>>>>>>>>           between header
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and KP-258.
> For
> > > 258,
> > > > > we
> > > > > >> add
> > > > > >>>>>>>>> headers
> > > > > >>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka Streams
> > and
> > > > > nobody
> > > > > >>>>>>> else is
> > > > > >>>>>>>>>>>> supposed
> > > > > >>>>>>>>>>>>>>>>>>>           to write
> > > > > >>>>>>>>>>>>>>>>>>>           > into
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no user
> > header
> > > > are
> > > > > >>>>> written
> > > > > >>>>>>> into
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not
> > conflicts.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I don't see
> a
> > > big
> > > > > issue
> > > > > >>>>> with
> > > > > >>>>>>>>> using
> > > > > >>>>>>>>>>>>>>>>>>>           headers within
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we document it,
> > we
> > > > can
> > > > > >> have
> > > > > >>>>>>> some
> > > > > >>>>>>>>>>>> "reserved"
> > > > > >>>>>>>>>>>>>>>>>>>           header keys
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not allowed to
> use
> > > > when
> > > > > >>>>>>> processing
> > > > > >>>>>>>>>>>> data with
> > > > > >>>>>>>>>>>>>>>>>>>           Kafka
> > > > > >>>>>>>>>>>>>>>>>>>           > Streams.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be ok.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a safe
> way
> > to
> > > > > avoid
> > > > > >>>>>>>>> conflicts,
> > > > > >>>>>>>>>>>> since
> > > > > >>>>>>>>>>>>>>>>>>> these
> > > > > >>>>>>>>>>>>>>>>>>>           > headers
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in internal
> > > topics
> > > > (I
> > > > > >>>>> think):
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and
> changelog
> > > > > topics,
> > > > > >> we
> > > > > >>>>> can
> > > > > >>>>>>>>>>>> namespace
> > > > > >>>>>>>>>>>>>>>>>>>           all headers:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined headers
> are
> > > > > >> namespaced
> > > > > >>>>> as
> > > > > >>>>>>>>>>>> "external."
> > > > > >>>>>>>>>>>>>>>>>>> +
> > > > > >>>>>>>>>>>>>>>>>>>           headerKey
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers are
> > > > > namespaced as
> > > > > >>>>>>>>>>>> "internal." +
> > > > > >>>>>>>>>>>>>>>>>>>           headerKey
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing would
> be
> > > > > >> possible,
> > > > > >>>>> it
> > > > > >>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>>>> require
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies
> a
> > > > > runtime
> > > > > >>>>>>> overhead.
> > > > > >>>>>>>>> I
> > > > > >>>>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>>>>           suggest to
> > > > > >>>>>>>>>>>>>>>>>>>           > no
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid
> > the
> > > > > >>>>> overhead.
> > > > > >>>>>>> If
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> becomes a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > problem in
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still
> add
> > > > name
> > > > > >>>>> spacing
> > > > > >>>>>>>>> later
> > > > > >>>>>>>>>>>> on.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern about the
> > > design
> > > > > it
> > > > > >> the
> > > > > >>>>>>> type
> > > > > >>>>>>>>> of
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the proposal
> > > > correctly,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1> table1 = ...
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2> table2 = ...
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3> joinedTable
> =
> > > > > >>>>>>>>>>>> table1.join(table2,...);
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the
> > `joinedTable`
> > > > has
> > > > > >> the
> > > > > >>>>>>> same
> > > > > >>>>>>>>> key
> > > > > >>>>>>>>>>>> as the
> > > > > >>>>>>>>>>>>>>>>>>>           left input
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not work
> > > because
> > > > > if
> > > > > >>>>> table2
> > > > > >>>>>>>>>>>> contains
> > > > > >>>>>>>>>>>>>>>>>>>           multiple rows
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record in
> table1
> > > > > (what is
> > > > > >>>>> the
> > > > > >>>>>>> main
> > > > > >>>>>>>>>>>> purpose
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>>>           > foreign
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result table
> > would
> > > > only
> > > > > >>>>>>> contain a
> > > > > >>>>>>>>>>>> single
> > > > > >>>>>>>>>>>>>>>>>>>           join result,
> > > > > >>>>>>>>>>>>>>>>>>>           > but
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream: <A,X>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream:
> > > <a,(A,1)>,
> > > > > >>>>> <b,(A,2)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value a
> > foreign
> > > > key
> > > > > to
> > > > > >>>>>>> table1
> > > > > >>>>>>>>> key
> > > > > >>>>>>>>>>>> (ie,
> > > > > >>>>>>>>>>>>>>>>>>>           "A" joins).
> > > > > >>>>>>>>>>>>>>>>>>>           > If
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the same key
> > as
> > > > key
> > > > > of
> > > > > >>>>>>> table1,
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>>>>           implies that the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be <A,
> > > > > join(X,1)>
> > > > > >> or
> > > > > >>>>> <A,
> > > > > >>>>>>>>>>>> join(X,2)>
> > > > > >>>>>>>>>>>>>>>>>>>           but not
> > > > > >>>>>>>>>>>>>>>>>>>           > both.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share the same
> > > key,
> > > > > >>>>> whatever
> > > > > >>>>>>>>> result
> > > > > >>>>>>>>>>>> record
> > > > > >>>>>>>>>>>>>>>>>>>           we emit
> > > > > >>>>>>>>>>>>>>>>>>>           > later,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the previous
> > result.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason why Jan
> > > > > originally
> > > > > >>>>>>> proposed
> > > > > >>>>>>>>>>>> to use
> > > > > >>>>>>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>>>>           > combination
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of the
> > input
> > > > > tables
> > > > > >>>>> as
> > > > > >>>>>>> key
> > > > > >>>>>>>>> of
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           output table.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of the
> output
> > > > table
> > > > > >>>>> unique
> > > > > >>>>>>> and
> > > > > >>>>>>>>> we
> > > > > >>>>>>>>>>>> can
> > > > > >>>>>>>>>>>>>>>>>>>           store both in
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be <A-a,
> > > > join(X,1)>,
> > > > > >> <A-b,
> > > > > >>>>>>>>>>>> join(X,2)>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM, Jan
> > > Filipiak
> > > > > >> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark could
> be
> > > > > >>>>> disregarded.
> > > > > >>>>>>> The
> > > > > >>>>>>>>>>>> decision
> > > > > >>>>>>>>>>>>>>>>>>>           about the
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the size of
> the
> > > > > >> aggregated
> > > > > >>>>>>> map.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long maps
> > > would
> > > > be
> > > > > >>>>>>> unpacked
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>>>>           forwarded. 0
> > > > > >>>>>>>>>>>>>>>>>>>           > element
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published as
> > delete.
> > > > Any
> > > > > >>>>> other
> > > > > >>>>>>> count
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is in
> > "waiting
> > > > for
> > > > > >>>>> correct
> > > > > >>>>>>>>>>>> deletes to
> > > > > >>>>>>>>>>>>>>>>>>>           > arrive"-state.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018 21:29, Adam
> > > > > Bellemare
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I could
> > > > replace
> > > > > >> the
> > > > > >>>>>>> second
> > > > > >>>>>>>>>>>>>>>>>>>           repartition store
> > > > > >>>>>>>>>>>>>>>>>>>           > and
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store with a
> > > groupBy
> > > > > and
> > > > > >>>>>>> reduce.
> > > > > >>>>>>>>>>>> However,
> > > > > >>>>>>>>>>>>>>>>>>>           it looks
> > > > > >>>>>>>>>>>>>>>>>>>           > like
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to store the
> > > > > highwater
> > > > > >>>>> value
> > > > > >>>>>>>>> within
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           materialized
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the arrival of
> > > > > out-of-order
> > > > > >>>>>>> records
> > > > > >>>>>>>>>>>> (assuming
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> my
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is correct...). This
> > in
> > > > > effect
> > > > > >> is
> > > > > >>>>>>> the
> > > > > >>>>>>>>> same
> > > > > >>>>>>>>>>>> as
> > > > > >>>>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>>           design I
> > > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two tables
> > > merged
> > > > > >>>>> together.
> > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           >
> > > > > >>>>>>>>>>>>>>>>>>>           >
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> --
> > > > > >>>>>>>>>>>>>>>>> -- Guozhang
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> --
> > > > > >>>>>>>>>>>>>>>> -- Guozhang
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to