Hi John,

Just made a pass on your diagram (nice hand-drawing btw!), and obviously we
are thinking about the same thing :) A neat difference that I like, is that
in the pre-join repartition topic we can still send message in the format
of `K=k, V=(i=2)` while using "i" as the partition key in StreamsPartition,
this way we do not need to even augment the key for the repartition topic,
but just do a projection on the foreign key part but trim all other fields:
as long as we still materialize the store as `A-2` co-located with the
right KTable, that is fine.

As I mentioned in my previous email, I also think this has a few advantages
on saving over-the-wire bytes as well as disk bytes.

Guozhang


On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io> wrote:

> Hi Guozhang,
>
> Thanks for taking a look! I think Adam's already addressed your questions
> as well as I could have.
>
> Hi Adam,
>
> Thanks for updating the KIP. It looks great, especially how all the
> need-to-know information is right at the top, followed by the details.
>
> Also, thanks for that high-level diagram. Actually, now that I'm looking
> at it, I think part of my proposal got lost in translation, although I do
> think that what you have there is also correct.
>
> I sketched up a crude diagram based on yours and attached it to the KIP
> (I'm not sure if attached or inline images work on the mailing list):
> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
> . It's also attached to this email for convenience.
>
> Hopefully, you can see how it's intended to line up, and which parts are
> modified.
> At a high level, instead of performing the join on the right-hand side,
> we're essentially just registering interest, like "LHS key A wishes to
> receive updates for RHS key 2". Then, when there is a new "interest" or any
> updates to the RHS records, it "broadcasts" its state back to the LHS
> records who are interested in it.
>
> Thus, instead of sending the LHS values to the RHS joiner workers and then
> sending the join results back to the LHS worke be co-partitioned and
> validated, we instead only send the LHS *keys* to the RHS workers and then
> only the RHS k/v back to be joined by the LHS worker.
>
> I've been considering both your diagram and mine, and I *think* what I'm
> proposing has a few advantages.
>
> Here are some points of interest as you look at the diagram:
> * When we extract the foreign key and send it to the Pre-Join Repartition
> Topic, we can send only the FK/PK pair. There's no need to worry about
> custom partitioner logic, since we can just use the foreign key plainly as
> the repartition record key. Also, we save on transmitting the LHS value,
> since we only send its key in this step.
> * We also only need to store the RHSKey:LHSKey mapping in the
> MaterializedSubscriptionStore, saving on disk. We can use the same rocks
> key format you proposed and the same algorithm involving range scans when
> the RHS records get updated.
> * Instead of joining on the right side, all we do is compose a
> re-repartition record so we can broadcast the RHS k/v pair back to the
> original LHS partition. (this is what the "rekey" node is doing)
> * Then, there is a special kind of Joiner that's co-resident in the same
> StreamTask as the LHS table, subscribed to the Post-Join Repartition Topic.
> ** This Joiner is *not* triggered directly by any changes in the LHS
> KTable. Instead, LHS events indirectly trigger the join via the whole
> lifecycle.
> ** For each event arriving from the Post-Join Repartition Topic, the
> Joiner looks up the corresponding record in the LHS KTable. It validates
> the FK as you noted, discarding any inconsistent events. Otherwise, it
> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the join
> result
> ** Note that the Joiner itself is stateless, so materializing the join
> result is optional, just as with the 1:1 joins.
>
> So in summary:
> * instead of transmitting the LHS keys and values to the right and the
> JoinResult back to the left, we only transmit the LHS keys to the right and
> the RHS values to the left. Assuming the average RHS value is on smaller
> than or equal to the average join result size, it's a clear win on broker
> traffic. I think this is actually a reasonable assumption, which we can
> discuss more if you're suspicious.
> * we only need one copy of the data (the left and right tables need to be
> materialized) and one extra copy of the PK:FK pairs in the Materialized
> Subscription Store. Materializing the join result is optional, just as with
> the existing 1:1 joins.
> * we still need the fancy range-scan algorithm on the right to locate all
> interested LHS keys when a RHS value is updated, but we don't need a custom
> partitioner for either repartition topic (this is of course a modification
> we could make to your version as well)
>
> How does this sound to you? (And did I miss anything?)
> -John
>
> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
>> Hi John & Guozhang
>>
>> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the KIP,
>> pruned much of what I wrote and put a simplified diagram near the top to
>> illustrate the workflow. I encapsulated Jan's content at the bottom of the
>> document. I believe it is simpler to read by far now.
>>
>> @Guozhang Wang <wangg...@gmail.com>:
>> > #1: rekey left table
>> >   -> source from the left upstream, send to rekey-processor to generate
>> combined key, and then sink to copartition topic.
>> Correct.
>>
>> > #2: first-join with right table
>> >   -> source from the right table upstream, materialize the right table.
>> >   -> source from the co-partition topic, materialize the rekeyed left
>> table, join with the right table, rekey back, and then sink to the
>> rekeyed-back topic.
>> Almost - I cleared up the KIP. We do not rekey back yet, as I need the
>> Foreign-Key value generated in #1 above to compare in the resolution
>> stage.
>>
>> > #3: second join
>> >    -> source from the rekeyed-back topic, materialize the rekeyed back
>> table.
>> >   -> source from the left upstream, materialize the left table, join
>> with
>> the rekeyed back table.
>> Almost - As each event comes in, we just run it through a stateful
>> processor that checks the original ("This") KTable for the key. The value
>> payload then has the foreignKeyExtractor applied again as in Part #1
>> above,
>> and gets the current foreign key. Then we compare it to the joined event
>> that we are currently resolving. If they have the same foreign-key,
>> propagate the result out. If they don't, throw the event away.
>>
>> The end result is that we do need to materialize 2 additional tables
>> (left/this-combinedkey table, and the final Joined table) as I've
>> illustrated in the updated KIP. I hope the diagram clears it up a lot
>> better. Please let me know.
>>
>> Thanks again
>> Adam
>>
>>
>>
>>
>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> > John,
>> >
>> > Thanks a lot for the suggestions on refactoring the wiki, I agree with
>> you
>> > that we should consider the KIP proposal to be easily understood by
>> anyone
>> > in the future to read, and hence should provide a good summary on the
>> > user-facing interfaces, as well as rejected alternatives to represent
>> > briefly "how we came a long way to this conclusion, and what we have
>> > argued, disagreed, and agreed about, etc" so that readers do not need to
>> > dig into the DISCUSS thread to get all the details. We can, of course,
>> keep
>> > the implementation details like "workflows" on the wiki page as a
>> addendum
>> > section since it also has correlations.
>> >
>> > Regarding your proposal on comment 6): that's a very interesting idea!
>> Just
>> > to clarify that I understands it fully correctly: the proposal's
>> resulted
>> > topology is still the same as the current proposal, where we will have 3
>> > sub-topologies for this operator:
>> >
>> > #1: rekey left table
>> >    -> source from the left upstream, send to rekey-processor to generate
>> > combined key, and then sink to copartition topic.
>> >
>> > #2: first-join with right table
>> >    -> source from the right table upstream, materialize the right table.
>> >    -> source from the co-partition topic, materialize the rekeyed left
>> > table, join with the right table, rekey back, and then sink to the
>> > rekeyed-back topic.
>> >
>> > #3: second join
>> >    -> source from the rekeyed-back topic, materialize the rekeyed back
>> > table.
>> >    -> source from the left upstream, materialize the left table, join
>> with
>> > the rekeyed back table.
>> >
>> > Sub-topology #1 and #3 may be merged to a single sub-topology since
>> both of
>> > them read from the left table source stream. In this workflow, we need
>> to
>> > materialize 4 tables (left table in #3, right table in #2, rekeyed left
>> > table in #2, rekeyed-back table in #3), and 2 repartition topics
>> > (copartition topic, rekeyed-back topic).
>> >
>> > Compared with Adam's current proposal in the workflow overview, it has
>> the
>> > same num.materialize tables (left table, rekeyed left table, right
>> table,
>> > out-of-ordering resolver table), and same num.internal topics (two). The
>> > advantage is that on the copartition topic, we can save bandwidth by not
>> > sending value, and in #2 the rekeyed left table is smaller since we do
>> not
>> > have any values to materialize. Is that right?
>> >
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> wrote:
>> >
>> > > Hi Adam,
>> > >
>> > > Given that the committers are all pretty busy right now, I think that
>> it
>> > > would help if you were to refactor the KIP a little to reduce the
>> > workload
>> > > for reviewers.
>> > >
>> > > I'd recommend the following changes:
>> > > * relocate all internal details to a section at the end called
>> something
>> > > like "Implementation Notes" or something like that.
>> > > * rewrite the rest of the KIP to be a succinct as possible and mention
>> > only
>> > > publicly-facing API changes.
>> > > ** for example, the interface that you've already listed there, as
>> well
>> > as
>> > > a textual description of the guarantees we'll be providing (join
>> result
>> > is
>> > > copartitioned with the LHS, and the join result is guaranteed correct)
>> > >
>> > > A good target would be that the whole main body of the KIP, including
>> > > Status, Motivation, Proposal, Justification, and Rejected Alternatives
>> > all
>> > > fit "above the fold" (i.e., all fit on the screen at a comfortable
>> zoom
>> > > level).
>> > > I think the only real Rejected Alternative that bears mention at this
>> > point
>> > > is KScatteredTable, which you could just include the executive
>> summary on
>> > > (no implementation details), and link to extra details in the
>> > > Implementation Notes section.
>> > >
>> > > Taking a look at the wiki page, ~90% of the text there is internal
>> > detail,
>> > > which is useful for the dubious, but doesn't need to be ratified in a
>> > vote
>> > > (and would be subject to change without notice in the future anyway).
>> > > There's also a lot of conflicting discussion, as you've very
>> respectfully
>> > > tried to preserve the original proposal from Jan while adding your
>> own.
>> > > Isolating all this information in a dedicated section at the bottom
>> frees
>> > > the voters up to focus on the public API part of the proposal, which
>> is
>> > > really all they need to consider.
>> > >
>> > > Plus, it'll be clear to future readers which parts of the document are
>> > > enduring, and which parts are a snapshot of our implementation
>> thinking
>> > at
>> > > the time.
>> > >
>> > > I'm suggesting this because I suspect that the others haven't made
>> time
>> > to
>> > > review it partly because it seems daunting. If it seems like it would
>> be
>> > a
>> > > huge time investment to review, people will just keep putting it off.
>> But
>> > > if the KIP is a single page, then they'll be more inclined to give it
>> a
>> > > read.
>> > >
>> > > Honestly, I don't think the KIP itself is that controversial (apart
>> from
>> > > the scattered table thing (sorry, Jan) ). Most of the discussion has
>> been
>> > > around the implementation, which we can continue more effectively in
>> a PR
>> > > once the KIP has passed.
>> > >
>> > > How does that sound?
>> > > -John
>> > >
>> > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
>> adam.bellem...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > 1) I believe that the resolution mechanism John has proposed is
>> > > sufficient
>> > > > - it is clean and easy and doesn't require additional RocksDB
>> stores,
>> > > which
>> > > > reduces the footprint greatly. I don't think we need to resolve
>> based
>> > on
>> > > > timestamp or offset anymore, but if we decide to do to that would be
>> > > within
>> > > > the bounds of the existing API.
>> > > >
>> > > > 2) Is the current API sufficient, or does it need to be altered to
>> go
>> > > back
>> > > > to vote?
>> > > >
>> > > > 3) KScatteredTable implementation can always be added in a future
>> > > revision.
>> > > > This API does not rule it out. This implementation of this function
>> > would
>> > > > simply be replaced with `KScatteredTable.resolve()` while still
>> > > maintaining
>> > > > the existing API, thereby giving both features as Jan outlined
>> earlier.
>> > > > Would this work?
>> > > >
>> > > >
>> > > > Thanks Guozhang, John and Jan
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io>
>> > wrote:
>> > > >
>> > > > > Hi, all,
>> > > > >
>> > > > > >> In fact, we
>> > > > > >> can just keep a single final-result store with timestamps and
>> > reject
>> > > > > values
>> > > > > >> that have a smaller timestamp, is that right?
>> > > > >
>> > > > > > Which is the correct output should at least be decided on the
>> > offset
>> > > of
>> > > > > > the original message.
>> > > > >
>> > > > > Thanks for this point, Jan.
>> > > > >
>> > > > > KIP-258 is merely to allow embedding the record timestamp  in the
>> k/v
>> > > > > store,
>> > > > > as well as providing a storage-format upgrade path.
>> > > > >
>> > > > > I might have missed it, but I think we have yet to discuss whether
>> > it's
>> > > > > safe
>> > > > > or desirable just to swap topic-ordering our for
>> timestamp-ordering.
>> > > This
>> > > > > is
>> > > > > a very deep topic, and I think it would only pollute the current
>> > > > > discussion.
>> > > > >
>> > > > > What Adam has proposed is safe, given the *current* ordering
>> > semantics
>> > > > > of the system. If we can agree on his proposal, I think we can
>> merge
>> > > the
>> > > > > feature well before the conversation about timestamp ordering even
>> > > takes
>> > > > > place, much less reaches a conclusion. In the mean time, it would
>> > seem
>> > > to
>> > > > > be unfortunate to have one join operator with different ordering
>> > > > semantics
>> > > > > from every other KTable operator.
>> > > > >
>> > > > > If and when that timestamp discussion takes place, many (all?)
>> KTable
>> > > > > operations
>> > > > > will need to be updated, rendering the many:one join a small
>> marginal
>> > > > cost.
>> > > > >
>> > > > > And, just to plug it again, I proposed an algorithm above that I
>> > > believe
>> > > > > provides
>> > > > > correct ordering without any additional metadata, and regardless
>> of
>> > the
>> > > > > ordering semantics. I didn't bring it up further, because I felt
>> the
>> > > KIP
>> > > > > only needs
>> > > > > to agree on the public API, and we can discuss the implementation
>> at
>> > > > > leisure in
>> > > > > a PR...
>> > > > >
>> > > > > Thanks,
>> > > > > -John
>> > > > >
>> > > > >
>> > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
>> > jan.filip...@trivago.com
>> > > >
>> > > > > wrote:
>> > > > >
>> > > > > >
>> > > > > >
>> > > > > > On 10.12.2018 07:42, Guozhang Wang wrote:
>> > > > > > > Hello Adam / Jan / John,
>> > > > > > >
>> > > > > > > Sorry for being late on this thread! I've finally got some
>> time
>> > > this
>> > > > > > > weekend to cleanup a load of tasks on my queue (actually I've
>> > also
>> > > > > > realized
>> > > > > > > there are a bunch of other things I need to enqueue while
>> > cleaning
>> > > > them
>> > > > > > up
>> > > > > > > --- sth I need to improve on my side). So here are my
>> thoughts:
>> > > > > > >
>> > > > > > > Regarding the APIs: I like the current written API in the KIP.
>> > More
>> > > > > > > generally I'd prefer to keep the 1) one-to-many join
>> > > functionalities
>> > > > as
>> > > > > > > well as 2) other join types than inner as separate KIPs since
>> 1)
>> > > may
>> > > > > > worth
>> > > > > > > a general API refactoring that can benefit not only foreignkey
>> > > joins
>> > > > > but
>> > > > > > > collocate joins as well (e.g. an extended proposal of
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
>> > > > > > ),
>> > > > > > > and I'm not sure if other join types would actually be needed
>> > > (maybe
>> > > > > left
>> > > > > > > join still makes sense), so it's better to
>> > > > > wait-for-people-to-ask-and-add
>> > > > > > > than add-sth-that-no-one-uses.
>> > > > > > >
>> > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a
>> > > > > > > KScatteredTable for users to inject their own optimization:
>> I'd
>> > > > prefer
>> > > > > to
>> > > > > > > do the current option as-is, and my main rationale is for
>> > > > optimization
>> > > > > > > rooms inside the Streams internals and the API succinctness.
>> For
>> > > > > advanced
>> > > > > > > users who may indeed prefer KScatteredTable and do their own
>> > > > > > optimization,
>> > > > > > > while it is too much of the work to use Processor API
>> directly, I
>> > > > think
>> > > > > > we
>> > > > > > > can still extend the current API to support it in the future
>> if
>> > it
>> > > > > > becomes
>> > > > > > > necessary.
>> > > > > >
>> > > > > > no internal optimization potential. it's a myth
>> > > > > >
>> > > > > > ¯\_(ツ)_/¯
>> > > > > >
>> > > > > > :-)
>> > > > > >
>> > > > > > >
>> > > > > > > Another note about step 4) resolving out-of-ordering data, as
>> I
>> > > > > mentioned
>> > > > > > > before I think with KIP-258 (embedded timestamp with key-value
>> > > store)
>> > > > > we
>> > > > > > > can actually make this step simpler than the current
>> proposal. In
>> > > > fact,
>> > > > > > we
>> > > > > > > can just keep a single final-result store with timestamps and
>> > > reject
>> > > > > > values
>> > > > > > > that have a smaller timestamp, is that right?
>> > > > > >
>> > > > > > Which is the correct output should at least be decided on the
>> > offset
>> > > of
>> > > > > > the original message.
>> > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > That's all I have in mind now. Again, great appreciation to
>> Adam
>> > to
>> > > > > make
>> > > > > > > such HUGE progress on this KIP!
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
>> > > > jan.filip...@trivago.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > >> If they don't find the time:
>> > > > > > >> They usually take the opposite path from me :D
>> > > > > > >> so the answer would be clear.
>> > > > > > >>
>> > > > > > >> hence my suggestion to vote.
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote:
>> > > > > > >>> Hi Guozhang and Matthias
>> > > > > > >>>
>> > > > > > >>> I know both of you are quite busy, but we've gotten this KIP
>> > to a
>> > > > > point
>> > > > > > >>> where we need more guidance on the API (perhaps a bit of a
>> > > > > tie-breaker,
>> > > > > > >> if
>> > > > > > >>> you will). If you have anyone else you may think should
>> look at
>> > > > this,
>> > > > > > >>> please tag them accordingly.
>> > > > > > >>>
>> > > > > > >>> The scenario is as such:
>> > > > > > >>>
>> > > > > > >>> Current Option:
>> > > > > > >>> API:
>> > > > > > >>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
>> > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the
>> > > partition
>> > > > > with
>> > > > > > >> the
>> > > > > > >>> foreignKey (repartition 1)
>> > > > > > >>> 2) Join the data
>> > > > > > >>> 3) Shuffle the data back to the original node (repartition
>> 2)
>> > > > > > >>> 4) Resolve out-of-order arrival / race condition due to
>> > > foreign-key
>> > > > > > >> changes.
>> > > > > > >>>
>> > > > > > >>> Alternate Option:
>> > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable.
>> > > > > > >>> - It would be keyed on a wrapped key function:
>> <CombinedKey<KO,
>> > > K>,
>> > > > > VR>
>> > > > > > >> (KO
>> > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result)
>> > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but
>> > > otherwise a
>> > > > > > user
>> > > > > > >>> would be able to perform additional functions directly from
>> the
>> > > > > > >>> KScatteredTable (TBD - currently out of scope).
>> > > > > > >>> - John's analysis 2-emails up is accurate as to the
>> tradeoffs.
>> > > > > > >>>
>> > > > > > >>> Current Option is coded as-is. Alternate option is possible,
>> > but
>> > > > will
>> > > > > > >>> require for implementation details to be made in the API and
>> > some
>> > > > > > >> exposure
>> > > > > > >>> of new data structures into the API (ie: CombinedKey).
>> > > > > > >>>
>> > > > > > >>> I appreciate any insight into this.
>> > > > > > >>>
>> > > > > > >>> Thanks.
>> > > > > > >>>
>> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
>> > > > > > adam.bellem...@gmail.com>
>> > > > > > >>> wrote:
>> > > > > > >>>
>> > > > > > >>>> Hi John
>> > > > > > >>>>
>> > > > > > >>>> Thanks for your feedback and assistance. I think your
>> summary
>> > is
>> > > > > > >> accurate
>> > > > > > >>>> from my perspective. Additionally, I would like to add that
>> > > there
>> > > > > is a
>> > > > > > >> risk
>> > > > > > >>>> of inconsistent final states without performing the
>> > resolution.
>> > > > This
>> > > > > > is
>> > > > > > >> a
>> > > > > > >>>> major concern for me as most of the data I have dealt with
>> is
>> > > > > produced
>> > > > > > >> by
>> > > > > > >>>> relational databases. We have seen a number of cases where
>> a
>> > > user
>> > > > in
>> > > > > > the
>> > > > > > >>>> Rails UI has modified the field (foreign key), realized
>> they
>> > > made
>> > > > a
>> > > > > > >>>> mistake, and then updated the field again with a new key.
>> The
>> > > > events
>> > > > > > are
>> > > > > > >>>> propagated out as they are produced, and as such we have
>> had
>> > > > > > real-world
>> > > > > > >>>> cases where these inconsistencies were propagated
>> downstream
>> > as
>> > > > the
>> > > > > > >> final
>> > > > > > >>>> values due to the race conditions in the fanout of the
>> data.
>> > > > > > >>>>
>> > > > > > >>>> This solution that I propose values correctness of the
>> final
>> > > > result
>> > > > > > over
>> > > > > > >>>> other factors.
>> > > > > > >>>>
>> > > > > > >>>> We could always move this function over to using a
>> > > KScatteredTable
>> > > > > > >>>> implementation in the future, and simply deprecate it this
>> > join
>> > > > API
>> > > > > in
>> > > > > > >>>> time. I think I would like to hear more from some of the
>> other
>> > > > major
>> > > > > > >>>> committers on which course of action they would think is
>> best
>> > > > before
>> > > > > > any
>> > > > > > >>>> more coding is done.
>> > > > > > >>>>
>> > > > > > >>>> Thanks again
>> > > > > > >>>>
>> > > > > > >>>> Adam
>> > > > > > >>>>
>> > > > > > >>>>
>> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
>> > j...@confluent.io>
>> > > > > > wrote:
>> > > > > > >>>>
>> > > > > > >>>>> Hi Jan and Adam,
>> > > > > > >>>>>
>> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results are
>> > > > > encouraging.
>> > > > > > >>>>>
>> > > > > > >>>>> Thanks for your performance experience as well, Jan. I
>> agree
>> > > that
>> > > > > > >> avoiding
>> > > > > > >>>>> unnecessary join outputs is especially important when the
>> > > fan-out
>> > > > > is
>> > > > > > so
>> > > > > > >>>>> high. I suppose this could also be built into the
>> > > implementation
>> > > > > > we're
>> > > > > > >>>>> discussing, but it wouldn't have to be specified in the
>> KIP
>> > > > (since
>> > > > > > >> it's an
>> > > > > > >>>>> API-transparent optimization).
>> > > > > > >>>>>
>> > > > > > >>>>> As far as whether or not to re-repartition the data, I
>> didn't
>> > > > bring
>> > > > > > it
>> > > > > > >> up
>> > > > > > >>>>> because it sounded like the two of you agreed to leave the
>> > KIP
>> > > > > as-is,
>> > > > > > >>>>> despite the disagreement.
>> > > > > > >>>>>
>> > > > > > >>>>> If you want my opinion, I feel like both approaches are
>> > > > reasonable.
>> > > > > > >>>>> It sounds like Jan values more the potential for
>> developers
>> > to
>> > > > > > optimize
>> > > > > > >>>>> their topologies to re-use the intermediate nodes, whereas
>> > Adam
>> > > > > > places
>> > > > > > >>>>> more
>> > > > > > >>>>> value on having a single operator that people can use
>> without
>> > > > extra
>> > > > > > >> steps
>> > > > > > >>>>> at the end.
>> > > > > > >>>>>
>> > > > > > >>>>> Personally, although I do find it exceptionally annoying
>> > when a
>> > > > > > >> framework
>> > > > > > >>>>> gets in my way when I'm trying to optimize something, it
>> > seems
>> > > > > better
>> > > > > > >> to
>> > > > > > >>>>> go
>> > > > > > >>>>> for a single operation.
>> > > > > > >>>>> * Encapsulating the internal transitions gives us
>> significant
>> > > > > > latitude
>> > > > > > >> in
>> > > > > > >>>>> the implementation (for example, joining only at the end,
>> not
>> > > in
>> > > > > the
>> > > > > > >>>>> middle
>> > > > > > >>>>> to avoid extra data copying and out-of-order resolution;
>> how
>> > we
>> > > > > > >> represent
>> > > > > > >>>>> the first repartition keys (combined keys vs. value
>> vectors),
>> > > > > etc.).
>> > > > > > >> If we
>> > > > > > >>>>> publish something like a KScatteredTable with the
>> > > > right-partitioned
>> > > > > > >> joined
>> > > > > > >>>>> data, then the API pretty much locks in the
>> implementation as
>> > > > well.
>> > > > > > >>>>> * The API seems simpler to understand and use. I do mean
>> > > "seems";
>> > > > > if
>> > > > > > >>>>> anyone
>> > > > > > >>>>> wants to make the case that KScatteredTable is actually
>> > > simpler,
>> > > > I
>> > > > > > >> think
>> > > > > > >>>>> hypothetical usage code would help. From a relational
>> algebra
>> > > > > > >> perspective,
>> > > > > > >>>>> it seems like KTable.join(KTable) should produce a new
>> KTable
>> > > in
>> > > > > all
>> > > > > > >>>>> cases.
>> > > > > > >>>>> * That said, there might still be room in the API for a
>> > > different
>> > > > > > >>>>> operation
>> > > > > > >>>>> like what Jan has proposed to scatter a KTable, and then
>> do
>> > > > things
>> > > > > > like
>> > > > > > >>>>> join, re-group, etc from there... I'm not sure; I haven't
>> > > thought
>> > > > > > >> through
>> > > > > > >>>>> all the consequences yet.
>> > > > > > >>>>>
>> > > > > > >>>>> This is all just my opinion after thinking over the
>> > discussion
>> > > so
>> > > > > > >> far...
>> > > > > > >>>>> -John
>> > > > > > >>>>>
>> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
>> > > > > > >> adam.bellem...@gmail.com>
>> > > > > > >>>>> wrote:
>> > > > > > >>>>>
>> > > > > > >>>>>> Updated the PR to take into account John's feedback.
>> > > > > > >>>>>>
>> > > > > > >>>>>> I did some preliminary testing for the performance of the
>> > > > > > prefixScan.
>> > > > > > >> I
>> > > > > > >>>>>> have attached the file, but I will also include the text
>> in
>> > > the
>> > > > > body
>> > > > > > >>>>> here
>> > > > > > >>>>>> for archival purposes (I am not sure what happens to
>> > attached
>> > > > > > files).
>> > > > > > >> I
>> > > > > > >>>>>> also updated the PR and the KIP accordingly.
>> > > > > > >>>>>>
>> > > > > > >>>>>> Summary: It scales exceptionally well for scanning large
>> > > values
>> > > > of
>> > > > > > >>>>>> records. As Jan mentioned previously, the real issue
>> would
>> > be
>> > > > more
>> > > > > > >>>>> around
>> > > > > > >>>>>> processing the resulting records after obtaining them.
>> For
>> > > > > instance,
>> > > > > > >> it
>> > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and a
>> > > further
>> > > > > > >>>>> ~35-85mS
>> > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of
>> them.
>> > > > > Iterating
>> > > > > > >>>>>> through the records just to generate a simple count
>> takes ~
>> > 40
>> > > > > times
>> > > > > > >>>>> longer
>> > > > > > >>>>>> than the flush + scan combined.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> ============================================================================================
>> > > > > > >>>>>> Setup:
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> ============================================================================================
>> > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap
>> > > (Xmx512m,
>> > > > > > >> Xms512m)
>> > > > > > >>>>>> CPU: i7 2.2 Ghz.
>> > > > > > >>>>>>
>> > > > > > >>>>>> Note: I am using a slightly-modified, directly-accessible
>> > > Kafka
>> > > > > > >> Streams
>> > > > > > >>>>>> RocksDB
>> > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding the
>> > > > > > >>>>>> ProcessorContext).
>> > > > > > >>>>>> There are no modifications to the default RocksDB values
>> > > > provided
>> > > > > in
>> > > > > > >> the
>> > > > > > >>>>>> 2.1/trunk release.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> keysize = 128 bytes
>> > > > > > >>>>>> valsize = 512 bytes
>> > > > > > >>>>>>
>> > > > > > >>>>>> Step 1:
>> > > > > > >>>>>> Write X positive matching events: (key = prefix +
>> > left-padded
>> > > > > > >>>>>> auto-incrementing integer)
>> > > > > > >>>>>> Step 2:
>> > > > > > >>>>>> Write 10X negative matching events (key = left-padded
>> > > > > > >> auto-incrementing
>> > > > > > >>>>>> integer)
>> > > > > > >>>>>> Step 3:
>> > > > > > >>>>>> Perform flush
>> > > > > > >>>>>> Step 4:
>> > > > > > >>>>>> Perform prefixScan
>> > > > > > >>>>>> Step 5:
>> > > > > > >>>>>> Iterate through return Iterator and validate the count of
>> > > > expected
>> > > > > > >>>>> events.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> ============================================================================================
>> > > > > > >>>>>> Results:
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> ============================================================================================
>> > > > > > >>>>>> X = 1k (11k events total)
>> > > > > > >>>>>> Flush Time = 39 mS
>> > > > > > >>>>>> Scan Time = 7 mS
>> > > > > > >>>>>> 6.9 MB disk
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------------------------
>> > > > > > >>>>>> X = 10k (110k events total)
>> > > > > > >>>>>> Flush Time = 45 mS
>> > > > > > >>>>>> Scan Time = 8 mS
>> > > > > > >>>>>> 127 MB
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------------------------
>> > > > > > >>>>>> X = 100k (1.1M events total)
>> > > > > > >>>>>> Test1:
>> > > > > > >>>>>> Flush Time = 60 mS
>> > > > > > >>>>>> Scan Time = 12 mS
>> > > > > > >>>>>> 678 MB
>> > > > > > >>>>>>
>> > > > > > >>>>>> Test2:
>> > > > > > >>>>>> Flush Time = 45 mS
>> > > > > > >>>>>> Scan Time = 7 mS
>> > > > > > >>>>>> 576 MB
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------------------------
>> > > > > > >>>>>> X = 1MB (11M events total)
>> > > > > > >>>>>> Test1:
>> > > > > > >>>>>> Flush Time = 52 mS
>> > > > > > >>>>>> Scan Time = 19 mS
>> > > > > > >>>>>> 7.2 GB
>> > > > > > >>>>>>
>> > > > > > >>>>>> Test2:
>> > > > > > >>>>>> Flush Time = 84 mS
>> > > > > > >>>>>> Scan Time = 34 mS
>> > > > > > >>>>>> 9.1 GB
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------------------------
>> > > > > > >>>>>> X = 2.5M (27.5M events total)
>> > > > > > >>>>>> Test1:
>> > > > > > >>>>>> Flush Time = 82 mS
>> > > > > > >>>>>> Scan Time = 63 mS
>> > > > > > >>>>>> 17GB - 276 sst files
>> > > > > > >>>>>>
>> > > > > > >>>>>> Test2:
>> > > > > > >>>>>> Flush Time = 116 mS
>> > > > > > >>>>>> Scan Time = 35 mS
>> > > > > > >>>>>> 23GB - 361 sst files
>> > > > > > >>>>>>
>> > > > > > >>>>>> Test3:
>> > > > > > >>>>>> Flush Time = 103 mS
>> > > > > > >>>>>> Scan Time = 82 mS
>> > > > > > >>>>>> 19 GB - 300 sst files
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> --------------------------------------------------------------------------------------------
>> > > > > > >>>>>>
>> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M
>> events. I
>> > > > tried
>> > > > > > to
>> > > > > > >> go
>> > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the
>> > 100GB+
>> > > > > range
>> > > > > > >>>>> and my
>> > > > > > >>>>>> laptop ran out of disk. More extensive testing could be
>> done
>> > > > but I
>> > > > > > >>>>> suspect
>> > > > > > >>>>>> that it would be in line with what we're seeing in the
>> > results
>> > > > > > above.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> At this point in time, I think the only major discussion
>> > point
>> > > > is
>> > > > > > >> really
>> > > > > > >>>>>> around what Jan and I have disagreed on: repartitioning
>> > back +
>> > > > > > >> resolving
>> > > > > > >>>>>> potential out of order issues or leaving that up to the
>> > client
>> > > > to
>> > > > > > >>>>> handle.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> Thanks folks,
>> > > > > > >>>>>>
>> > > > > > >>>>>> Adam
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
>> > > > > > jan.filip...@trivago.com
>> > > > > > >>>
>> > > > > > >>>>>> wrote:
>> > > > > > >>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote:
>> > > > > > >>>>>>>> Hi all,
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Sorry that this discussion petered out... I think the
>> 2.1
>> > > > > release
>> > > > > > >>>>>>> caused an
>> > > > > > >>>>>>>> extended distraction that pushed it off everyone's
>> radar
>> > > > (which
>> > > > > > was
>> > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had
>> some
>> > > > extend
>> > > > > > >>>>>>>> distractions of my own that kept (and continue to
>> keep) me
>> > > > > > >>>>> preoccupied.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I guess
>> Jan
>> > > was
>> > > > > on
>> > > > > > >> the
>> > > > > > >>>>>>> right
>> > > > > > >>>>>>>> track!
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document and
>> the
>> > > > prior
>> > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts:
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> API Thoughts:
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a
>> > many-to-one
>> > > > > join.
>> > > > > > >>>>> Could
>> > > > > > >>>>>>> we
>> > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
>> flip
>> > > the
>> > > > > > design
>> > > > > > >>>>>>> around
>> > > > > > >>>>>>>> and make it a oneToManyJoin?
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the join
>> > > type,
>> > > > > and
>> > > > > > it
>> > > > > > >>>>>>> seems
>> > > > > > >>>>>>>> like it might trick some people into using it for a
>> > > one-to-one
>> > > > > > join.
>> > > > > > >>>>>>> This
>> > > > > > >>>>>>>> would work, of course, but it would be super
>> inefficient
>> > > > > compared
>> > > > > > to
>> > > > > > >>>>> a
>> > > > > > >>>>>>>> simple rekey-and-join.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's
>> > specified
>> > > > > > whether
>> > > > > > >>>>>>> it's an
>> > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer
>> join, as
>> > > > > > >>>>> (neglecting
>> > > > > > >>>>>>> IQ),
>> > > > > > >>>>>>>> the rest can be achieved by filtering or by handling
>> it in
>> > > the
>> > > > > > >>>>>>> ValueJoiner.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite
>> > > right.
>> > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different
>> > > paradigms
>> > > > in
>> > > > > > >>>>> play in
>> > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of
>> three
>> > > > > > Serialized
>> > > > > > >>>>>>> args, I
>> > > > > > >>>>>>>> think it would be better to have one that allows
>> > > (optionally)
>> > > > > > >> setting
>> > > > > > >>>>>>> the 4
>> > > > > > >>>>>>>> incoming serdes. The result serde is defined by the
>> > > > > Materialized.
>> > > > > > >> The
>> > > > > > >>>>>>>> incoming serdes can be optional because they might
>> already
>> > > be
>> > > > > > >>>>> available
>> > > > > > >>>>>>> on
>> > > > > > >>>>>>>> the source KTables, or the default serdes from the
>> config
>> > > > might
>> > > > > be
>> > > > > > >>>>>>>> applicable.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins
>> > > don't
>> > > > > > allow
>> > > > > > >>>>>>> setting
>> > > > > > >>>>>>>> one, and it seems like it might actually be harmful,
>> since
>> > > the
>> > > > > > rekey
>> > > > > > >>>>>>>> operation needs to produce results that are
>> co-partitioned
>> > > > with
>> > > > > > the
>> > > > > > >>>>>>> "other"
>> > > > > > >>>>>>>> KTable.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I
>> didn't
>> > > > > actually
>> > > > > > >>>>>>> follow
>> > > > > > >>>>>>>> what Matthias meant about namespacing requiring
>> > > > "deserializing"
>> > > > > > the
>> > > > > > >>>>>>> record
>> > > > > > >>>>>>>> header. The headers are already Strings, so I don't
>> think
>> > > that
>> > > > > > >>>>>>>> deserialization is required. If we applied the
>> namespace
>> > at
>> > > > > source
>> > > > > > >>>>> nodes
>> > > > > > >>>>>>>> and stripped it at sink nodes, this would be
>> practically
>> > no
>> > > > > > >> overhead.
>> > > > > > >>>>>>> The
>> > > > > > >>>>>>>> advantage of the namespace idea is that no public API
>> > change
>> > > > wrt
>> > > > > > >>>>> headers
>> > > > > > >>>>>>>> needs to happen, and no restrictions need to be placed
>> on
>> > > > users'
>> > > > > > >>>>>>> headers.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> (Although I'm wondering if we can get away without the
>> > > header
>> > > > at
>> > > > > > >>>>> all...
>> > > > > > >>>>>>>> stay tuned)
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM
>> table
>> > > > > growing
>> > > > > > >>>>>>> without
>> > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively
>> > > implementing
>> > > > > OCC
>> > > > > > >> to
>> > > > > > >>>>>>>> resolve the problem you noted with disordering when the
>> > > rekey
>> > > > is
>> > > > > > >>>>>>>> reversed... particularly notable when the FK changes.
>> As
>> > > such,
>> > > > > it
>> > > > > > >>>>> only
>> > > > > > >>>>>>>> needs to track the most recent "version" (the offset in
>> > the
>> > > > > source
>> > > > > > >>>>>>>> partition) of each key. Therefore, it should have the
>> same
>> > > > > number
>> > > > > > of
>> > > > > > >>>>>>> keys
>> > > > > > >>>>>>>> as the source table at all times.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think
>> might
>> > be
>> > > > > > relevant
>> > > > > > >>>>> in
>> > > > > > >>>>>>> a
>> > > > > > >>>>>>>> couple of ways. One: it's just about storing the
>> timestamp
>> > > in
>> > > > > the
>> > > > > > >>>>> state
>> > > > > > >>>>>>>> store, but the ultimate idea is to effectively use the
>> > > > timestamp
>> > > > > > as
>> > > > > > >>>>> an
>> > > > > > >>>>>>> OCC
>> > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't
>> want to
>> > > use
>> > > > > the
>> > > > > > >>>>>>>> timestamp for this operation, but if you were to use a
>> > > similar
>> > > > > > >>>>>>> mechanism to
>> > > > > > >>>>>>>> store the source offset in the store alongside the
>> > re-keyed
>> > > > > > values,
>> > > > > > >>>>> then
>> > > > > > >>>>>>>> you could avoid a separate table.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a long
>> > > time,
>> > > > so
>> > > > > > >> I've
>> > > > > > >>>>>>>> probably missed something here, but I'm wondering if we
>> > can
>> > > > > avoid
>> > > > > > >> the
>> > > > > > >>>>>>> HWM
>> > > > > > >>>>>>>> tracking at all and resolve out-of-order during a final
>> > join
>> > > > > > >>>>> instead...
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K: Letter
>> > FK,
>> > > > > (other
>> > > > > > >>>>>>> data))
>> > > > > > >>>>>>>> to a right table (Letter K: (some data)).
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Left table:
>> > > > > > >>>>>>>> 1: (A, xyz)
>> > > > > > >>>>>>>> 2: (B, asd)
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Right table:
>> > > > > > >>>>>>>> A: EntityA
>> > > > > > >>>>>>>> B: EntityB
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined
>> key,
>> > but
>> > > > not
>> > > > > > >>>>>>>> propagating the value at all..
>> > > > > > >>>>>>>> Rekey table:
>> > > > > > >>>>>>>> A-1: (dummy value)
>> > > > > > >>>>>>>> B-2: (dummy value)
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Which we then join with the right table to produce:
>> > > > > > >>>>>>>> A-1: EntityA
>> > > > > > >>>>>>>> B-2: EntityB
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Which gets rekeyed back:
>> > > > > > >>>>>>>> 1: A, EntityA
>> > > > > > >>>>>>>> 2: B, EntityB
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> And finally we do the actual join:
>> > > > > > >>>>>>>> Result table:
>> > > > > > >>>>>>>> 1: ((A, xyz), EntityA)
>> > > > > > >>>>>>>> 2: ((B, asd), EntityB)
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> The thing is that in that last join, we have the
>> > opportunity
>> > > > to
>> > > > > > >>>>> compare
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>> current FK in the left table with the incoming PK of
>> the
>> > > right
>> > > > > > >>>>> table. If
>> > > > > > >>>>>>>> they don't match, we just drop the event, since it
>> must be
>> > > > > > outdated.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz)
>> gets
>> > > > > updated
>> > > > > > >> to
>> > > > > > >>>>>>> (1:
>> > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about whether
>> the
>> > > > final
>> > > > > > >>>>> state
>> > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the
>> > algorithm
>> > > > > above,
>> > > > > > >>>>> you
>> > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
>> (B,
>> > > xyz),
>> > > > > (B,
>> > > > > > >>>>>>>> EntityB)). It seems like this does give you enough
>> > > information
>> > > > > to
>> > > > > > >>>>> make
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>> right choice, regardless of disordering.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Will check Adams patch, but this should work. As
>> mentioned
>> > > > often
>> > > > > I
>> > > > > > am
>> > > > > > >>>>>>> not convinced on partitioning back for the user
>> > > automatically.
>> > > > I
>> > > > > > >> think
>> > > > > > >>>>>>> this is the real performance eater ;)
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the
>> > > > performance
>> > > > > of
>> > > > > > >>>>> the
>> > > > > > >>>>>>>> range scans when records change in the right table.
>> You've
>> > > > said
>> > > > > > that
>> > > > > > >>>>>>> you've
>> > > > > > >>>>>>>> been using the algorithm you presented in production
>> for a
>> > > > > while.
>> > > > > > >> Can
>> > > > > > >>>>>>> you
>> > > > > > >>>>>>>> give us a sense of the performance characteristics
>> you've
>> > > > > > observed?
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The
>> topmost
>> > > > thing
>> > > > > > here
>> > > > > > >>>>> is
>> > > > > > >>>>>>> / was correctness. In practice I do not measure the
>> > > performance
>> > > > > of
>> > > > > > >> the
>> > > > > > >>>>>>> range scan. Usual cases I run this with is emitting
>> 500k -
>> > > 1kk
>> > > > > rows
>> > > > > > >>>>>>> on a left hand side change. The range scan is just the
>> work
>> > > you
>> > > > > > gotta
>> > > > > > >>>>>>> do, also when you pack your data into different formats,
>> > > > usually
>> > > > > > the
>> > > > > > >>>>>>> rocks performance is very tight to the size of the data
>> and
>> > > we
>> > > > > > can't
>> > > > > > >>>>>>> really change that. It is more important for users to
>> > prevent
>> > > > > > useless
>> > > > > > >>>>>>> updates to begin with. My left hand side is guarded to
>> drop
>> > > > > changes
>> > > > > > >>>>> that
>> > > > > > >>>>>>> are not going to change my join output.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> usually it's:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> drop unused fields and then don't forward if
>> > old.equals(new)
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> regarding to the performance of creating an iterator for
>> > > > smaller
>> > > > > > >>>>>>> fanouts, users can still just do a group by first then
>> > > anyways.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>> I could only think of one alternative, but I'm not
>> sure if
>> > > > it's
>> > > > > > >>>>> better
>> > > > > > >>>>>>> or
>> > > > > > >>>>>>>> worse... If the first re-key only needs to preserve the
>> > > > original
>> > > > > > >> key,
>> > > > > > >>>>>>> as I
>> > > > > > >>>>>>>> proposed in #6, then we could store a vector of keys in
>> > the
>> > > > > value:
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Left table:
>> > > > > > >>>>>>>> 1: A,...
>> > > > > > >>>>>>>> 2: B,...
>> > > > > > >>>>>>>> 3: A,...
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Gets re-keyed:
>> > > > > > >>>>>>>> A: [1, 3]
>> > > > > > >>>>>>>> B: [2]
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Then, the rhs part of the join would only need a
>> regular
>> > > > > > single-key
>> > > > > > >>>>>>> lookup.
>> > > > > > >>>>>>>> Of course we have to deal with the problem of large
>> > values,
>> > > as
>> > > > > > >>>>> there's
>> > > > > > >>>>>>> no
>> > > > > > >>>>>>>> bound on the number of lhs records that can reference
>> rhs
>> > > > > records.
>> > > > > > >>>>>>> Offhand,
>> > > > > > >>>>>>>> I'd say we could page the values, so when one row is
>> past
>> > > the
>> > > > > > >>>>>>> threshold, we
>> > > > > > >>>>>>>> append the key for the next page. Then in most cases,
>> it
>> > > would
>> > > > > be
>> > > > > > a
>> > > > > > >>>>>>> single
>> > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would be
>> one
>> > > per
>> > > > > > (max
>> > > > > > >>>>>>> value
>> > > > > > >>>>>>>> size)/(avg lhs key size).
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> This seems more complex, though... Plus, I think
>> there's
>> > > some
>> > > > > > extra
>> > > > > > >>>>>>>> tracking we'd need to do to know when to emit a
>> > retraction.
>> > > > For
>> > > > > > >>>>> example,
>> > > > > > >>>>>>>> when record 1 is deleted, the re-key table would just
>> have
>> > > (A:
>> > > > > > [3]).
>> > > > > > >>>>>>> Some
>> > > > > > >>>>>>>> kind of tombstone is needed so that the join result
>> for 1
>> > > can
>> > > > > also
>> > > > > > >> be
>> > > > > > >>>>>>>> retracted.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> That's all!
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful
>> > KIP.
>> > > > > Sorry
>> > > > > > >> the
>> > > > > > >>>>>>>> discussion has been slow.
>> > > > > > >>>>>>>> -John
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
>> > > > > > >>>>> jan.filip...@trivago.com>
>> > > > > > >>>>>>>> wrote:
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>> Id say you can just call the vote.
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>> that happens all the time, and if something comes up,
>> it
>> > > just
>> > > > > > goes
>> > > > > > >>>>> back
>> > > > > > >>>>>>>>> to discuss.
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>> would not expect to much attention with another
>> another
>> > > email
>> > > > > in
>> > > > > > >>>>> this
>> > > > > > >>>>>>>>> thread.
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>> best Jan
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
>> > > > > > >>>>>>>>>> Hello Contributors
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do
>> need
>> > to
>> > > > bump
>> > > > > > >>>>> this to
>> > > > > > >>>>>>>>> keep
>> > > > > > >>>>>>>>>> visibility up. I am still intending to push this
>> through
>> > > > once
>> > > > > > >>>>>>> contributor
>> > > > > > >>>>>>>>>> feedback is given.
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> Main points that need addressing:
>> > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current
>> > > singular
>> > > > > > graph
>> > > > > > >>>>> node
>> > > > > > >>>>>>>>> into
>> > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right
>> > > now. I
>> > > > > am
>> > > > > > a
>> > > > > > >>>>> bit
>> > > > > > >>>>>>>>> fuzzy
>> > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so I
>> > would
>> > > > > > >>>>> appreciate
>> > > > > > >>>>>>> any
>> > > > > > >>>>>>>>>> help on this aspect.
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This
>> thread
>> > > has
>> > > > > > much
>> > > > > > >>>>>>>>> discourse
>> > > > > > >>>>>>>>>> between Jan and I between the current highwater mark
>> > > > proposal
>> > > > > > and
>> > > > > > >> a
>> > > > > > >>>>>>>>> groupBy
>> > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need
>> to
>> > > > > strictly
>> > > > > > >>>>> handle
>> > > > > > >>>>>>>>> any
>> > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it up
>> to
>> > the
>> > > > > > >>>>> consumer.
>> > > > > > >>>>>>> Any
>> > > > > > >>>>>>>>>> comments or suggestions here would also help.
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent this
>> > from
>> > > > > > moving
>> > > > > > >>>>> to a
>> > > > > > >>>>>>>>> vote?
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> Thanks
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> Adam
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>> > > > > > >>>>>>>>> adam.bellem...@gmail.com>
>> > > > > > >>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>> Hi Jan
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and
>> > > > > > >>>>> Stores.persistentWindowStore,
>> > > > > > >>>>>>> you
>> > > > > > >>>>>>>>>>> actually only need to specify the amount of segments
>> > you
>> > > > want
>> > > > > > and
>> > > > > > >>>>> how
>> > > > > > >>>>>>>>> large
>> > > > > > >>>>>>>>>>> they are. To the best of my understanding, what
>> happens
>> > > is
>> > > > > that
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>>>> segments are automatically rolled over as new data
>> with
>> > > new
>> > > > > > >>>>>>> timestamps
>> > > > > > >>>>>>>>> are
>> > > > > > >>>>>>>>>>> created. We use this exact functionality in some of
>> the
>> > > > work
>> > > > > > done
>> > > > > > >>>>>>>>>>> internally at my company. For reference, this is the
>> > > > hopping
>> > > > > > >>>>> windowed
>> > > > > > >>>>>>>>> store.
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> In the code that I have provided, there are going
>> to be
>> > > two
>> > > > > 24h
>> > > > > > >>>>>>>>> segments.
>> > > > > > >>>>>>>>>>> When a record is put into the windowStore, it will
>> be
>> > > > > inserted
>> > > > > > at
>> > > > > > >>>>>>> time
>> > > > > > >>>>>>>>> T in
>> > > > > > >>>>>>>>>>> both segments. The two segments will always overlap
>> by
>> > > 12h.
>> > > > > As
>> > > > > > >>>>> time
>> > > > > > >>>>>>>>> goes on
>> > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+), the
>> > > oldest
>> > > > > > >> segment
>> > > > > > >>>>>>> will
>> > > > > > >>>>>>>>> be
>> > > > > > >>>>>>>>>>> automatically deleted and a new segment created. The
>> > > > records
>> > > > > > are
>> > > > > > >>>>> by
>> > > > > > >>>>>>>>> default
>> > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that it
>> is
>> > > the
>> > > > > > record
>> > > > > > >>>>>>> time,
>> > > > > > >>>>>>>>> not
>> > > > > > >>>>>>>>>>> the clock time, which is used.
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps are
>> > > > retained
>> > > > > > when
>> > > > > > >>>>>>>>>>> restoring from the changelog.
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL
>> > at a
>> > > > > > >>>>>>> segment-level,
>> > > > > > >>>>>>>>>>> instead of at an individual record level.
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>> > > > > > >>>>>>> jan.filip...@trivago.com>
>> > > > > > >>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with
>> > > > > > ClassCastException
>> > > > > > >>>>> or
>> > > > > > >>>>>>>>>>>> similar.
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> You either would have to specify the window you
>> > > fetch/put
>> > > > or
>> > > > > > >>>>> iterate
>> > > > > > >>>>>>>>>>>> across all windows the key was found in right?
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check
>> stream-time
>> > > > under
>> > > > > > the
>> > > > > > >>>>>>> hoods
>> > > > > > >>>>>>>>>>>> that would be a questionable interface.
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking all
>> the
>> > > > > windows
>> > > > > > >>>>>>> earlier?
>> > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable
>> time
>> > > > > > gurantees.
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> Best
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>> > > > > > >>>>>>>>>>>>> Hi Jan
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
>> changed
>> > > the
>> > > > > > state
>> > > > > > >>>>>>> store,
>> > > > > > >>>>>>>>>>>> not
>> > > > > > >>>>>>>>>>>>> the ProcessorSupplier.
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> Thanks,
>> > > > > > >>>>>>>>>>>>> Adam
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> @Guozhang
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed
>> > something
>> > > > that
>> > > > > > >>>>> will be
>> > > > > > >>>>>>>>>>>>>>> extremely
>> > > > > > >>>>>>>>>>>>>>> useful for this KIP.
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> @Jan
>> > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I
>> > will
>> > > > not
>> > > > > > be
>> > > > > > >>>>>>> moving
>> > > > > > >>>>>>>>>>>> ahead
>> > > > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy
>> > > solution
>> > > > > as
>> > > > > > >> you
>> > > > > > >>>>>>>>>>>> propose.
>> > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it
>> > yourself
>> > > > off
>> > > > > > of
>> > > > > > >>>>> my
>> > > > > > >>>>>>>>>>>> current PR
>> > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I
>> would
>> > > be
>> > > > > more
>> > > > > > >>>>> than
>> > > > > > >>>>>>>>>>>> happy to
>> > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it
>> > stands
>> > > > > right
>> > > > > > >>>>> now,
>> > > > > > >>>>>>> I do
>> > > > > > >>>>>>>>>>>> not
>> > > > > > >>>>>>>>>>>>>>> really have more time to invest into
>> alternatives
>> > > > without
>> > > > > > >>>>> there
>> > > > > > >>>>>>>>> being
>> > > > > > >>>>>>>>>>>> a
>> > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters which
>> > they
>> > > > > would
>> > > > > > >>>>>>> prefer.
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave
>> up
>> > on
>> > > > the
>> > > > > > >>>>> streams
>> > > > > > >>>>>>>>> DSL
>> > > > > > >>>>>>>>>>>> for
>> > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have pulled
>> > this
>> > > > KIP
>> > > > > > >>>>> through
>> > > > > > >>>>>>>>>>>> already.
>> > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on
>> > > PAPI.
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the
>> windowed
>> > > > state
>> > > > > > >>>>> store
>> > > > > > >>>>>>> in
>> > > > > > >>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>> next
>> > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I
>> > will
>> > > > come
>> > > > > > >> back
>> > > > > > >>>>>>> for
>> > > > > > >>>>>>>>>>>> final
>> > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of
>> > the
>> > > > > > binding
>> > > > > > >>>>>>> voters
>> > > > > > >>>>>>>>>>>> could
>> > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
>> updated
>> > it
>> > > > > > >>>>> according
>> > > > > > >>>>>>> to
>> > > > > > >>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>> latest plan:
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed
>> > > store.
>> > > > > > This
>> > > > > > >>>>>>> could
>> > > > > > >>>>>>>>> be
>> > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they
>> > are
>> > > > > > >>>>> completed.
>> > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> Thanks,
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> Adam
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
>> > already
>> > > > > > updated
>> > > > > > >>>>> in
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> PR?
>> > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing
>> > > > > something?
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
>> > > > > > >>>>>>> wangg...@gmail.com>
>> > > > > > >>>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is
>> the
>> > > > wrong
>> > > > > > >> link,
>> > > > > > >>>>>>> as it
>> > > > > > >>>>>>>>>>>> is
>> > > > > > >>>>>>>>>>>>>>>> for
>> > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
>> part of
>> > > > > KIP-258
>> > > > > > >>>>> we do
>> > > > > > >>>>>>>>>>>> want to
>> > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source
>> > KTable"
>> > > > such
>> > > > > > >> that
>> > > > > > >>>>>>>>>>>> instead of
>> > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized
>> > store,
>> > > > > i.e.
>> > > > > > >>>>>>> following
>> > > > > > >>>>>>>>>>>>>>>> offset
>> > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are older
>> > than
>> > > > the
>> > > > > > >>>>> current
>> > > > > > >>>>>>>>>>>> key's
>> > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> Guozhang
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
>> Wang <
>> > > > > > >>>>>>>>> wangg...@gmail.com>
>> > > > > > >>>>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> Hello Adam,
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
>> final
>> > > step
>> > > > > > (i.e.
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>> high
>> > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
>> with
>> > a
>> > > > > window
>> > > > > > >>>>>>> store),
>> > > > > > >>>>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>> think
>> > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually
>> help:
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > >>>>>>>>>>>>>>>>>
>> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a
>> key-value
>> > > > store
>> > > > > > >>>>> (i.e.
>> > > > > > >>>>>>> only
>> > > > > > >>>>>>>>>>>> for
>> > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
>> usage,
>> > as
>> > > > > > >>>>> described
>> > > > > > >>>>>>> in
>> > > > > > >>>>>>>>>>>>>>>>>
>> https://issues.apache.org/jira/browse/KAFKA-5533
>> > ,
>> > > is
>> > > > > > that
>> > > > > > >>>>> we
>> > > > > > >>>>>>> can
>> > > > > > >>>>>>>>>>>> then
>> > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its
>> > > > > timestamp
>> > > > > > is
>> > > > > > >>>>>>>>> smaller
>> > > > > > >>>>>>>>>>>> than
>> > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I
>> > think
>> > > it
>> > > > > is
>> > > > > > >>>>> very
>> > > > > > >>>>>>>>>>>> similar to
>> > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark based
>> > > > > filtering,
>> > > > > > >>>>> while
>> > > > > > >>>>>>>>> you
>> > > > > > >>>>>>>>>>>> only
>> > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the
>> > > joining
>> > > > > > >> records
>> > > > > > >>>>>>> are
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> correctly
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the
>> final
>> > > > stage.
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and
>> > hence
>> > > > > > >>>>>>> non-windowed
>> > > > > > >>>>>>>>>>>> KTables
>> > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
>> really
>> > > have
>> > > > a
>> > > > > > good
>> > > > > > >>>>>>>>> support
>> > > > > > >>>>>>>>>>>> for
>> > > > > > >>>>>>>>>>>>>>>>> their joins anyways (
>> > > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
>> > > > > > >>>>>>>>>>>>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed
>> > > KTable-KTable
>> > > > > > >>>>> non-key
>> > > > > > >>>>>>>>> joins
>> > > > > > >>>>>>>>>>>> for
>> > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Guozhang
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak
>> <
>> > > > > > >>>>>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would
>> > grow
>> > > > > > >> endlessly
>> > > > > > >>>>>>> based
>> > > > > > >>>>>>>>>>>> on
>> > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair
>> of
>> > > > (<this
>> > > > > > >>>>> table
>> > > > > > >>>>>>>>>>>> primary
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> key>,
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is
>> used
>> > > to
>> > > > > > >>>>>>> differentiate
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> between
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
>> proposal
>> > > > would
>> > > > > > be
>> > > > > > >>>>> to
>> > > > > > >>>>>>>>>>>> replace
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> it
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
>> This
>> > > would
>> > > > > > allow
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>> same
>> > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time.
>> This
>> > > > > should
>> > > > > > >>>>> allow
>> > > > > > >>>>>>> for
>> > > > > > >>>>>>>>>>>> all
>> > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
>> > should
>> > > be
>> > > > > > >>>>>>> customizable
>> > > > > > >>>>>>>>>>>> by
>> > > > > > >>>>>>>>>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
>> perhaps
>> > > just
>> > > > > 10
>> > > > > > >>>>>>> minutes
>> > > > > > >>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> window,
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...).
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do
>> the
>> > > > trick
>> > > > > > >> here.
>> > > > > > >>>>>>> Even
>> > > > > > >>>>>>>>>>>> if I
>> > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic
>> > > repartitioning
>> > > > > > >>>>> optional
>> > > > > > >>>>>>>>>>>> since I
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
>> am a
>> > > > little
>> > > > > > bit
>> > > > > > >>>>>>>>>>>> sceptical
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> about
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially one
>> > > could
>> > > > > run
>> > > > > > >>>>> into
>> > > > > > >>>>>>>>>>>> problems
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> when
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window
>> border. I
>> > > will
>> > > > > > check
>> > > > > > >>>>> you
>> > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its
>> problematic, we
>> > > > could
>> > > > > > >>>>> still
>> > > > > > >>>>>>>>> check
>> > > > > > >>>>>>>>>>>>>>>>>> _all_
>> > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance
>> > > impact I
>> > > > > > >> guess.
>> > > > > > >>>>>>> Will
>> > > > > > >>>>>>>>>>>> let
>> > > > > > >>>>>>>>>>>>>>>>>> you
>> > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct
>> as
>> > > is. I
>> > > > > > >>>>> wouldn't
>> > > > > > >>>>>>> not
>> > > > > > >>>>>>>>>>>> like
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
>> > > timestamp(A)  <
>> > > > > > >>>>>>>>> timestamp(B).
>> > > > > > >>>>>>>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> think
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> we can't expect that.
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> @Jan
>> > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now -
>> > thanks
>> > > > for
>> > > > > > the
>> > > > > > >>>>>>>>>>>> diagram, it
>> > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do
>> not
>> > > have
>> > > > > the
>> > > > > > >>>>>>> original
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> primary
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was
>> > > available
>> > > > > > then
>> > > > > > >>>>> you
>> > > > > > >>>>>>>>>>>> would be
>> > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map.
>> > That
>> > > > > being
>> > > > > > >>>>> said,
>> > > > > > >>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> encourage
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for
>> > > clarity
>> > > > > for
>> > > > > > >>>>>>> everyone
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> else.
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really
>> hard
>> > > > work.
>> > > > > > But
>> > > > > > >>>>> I
>> > > > > > >>>>>>>>>>>> understand
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
>> > > original
>> > > > > > >> primary
>> > > > > > >>>>>>> key,
>> > > > > > >>>>>>>>> We
>> > > > > > >>>>>>>>>>>>>>>>>> have
>> > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI
>> > and
>> > > > > > >> basically
>> > > > > > >>>>>>> not
>> > > > > > >>>>>>>>>>>> using
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> any
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed
>> > that
>> > > in
>> > > > > > >>>>> original
>> > > > > > >>>>>>> DSL
>> > > > > > >>>>>>>>>>>> its
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> not
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
>> up on
>> > > my
>> > > > > end.
>> > > > > > >>>>> Will
>> > > > > > >>>>>>>>>>>> finish
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
>> week.
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the
>> Map
>> > > stay
>> > > > > > >> inside
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> State
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes
>> > have
>> > > > > > >>>>> propagated?
>> > > > > > >>>>>>>>> Isn't
>> > > > > > >>>>>>>>>>>>>>>>>>> this
>> > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
>> state
>> > > > store?
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
>> substractor
>> > is
>> > > > > gonna
>> > > > > > >>>>>>> return
>> > > > > > >>>>>>>>>>>> `null`
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
>> there
>> > is
>> > > > > going
>> > > > > > to
>> > > > > > >>>>> be
>> > > > > > >>>>>>> a
>> > > > > > >>>>>>>>>>>> store
>> > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this
>> > store
>> > > > > > directly
>> > > > > > >>>>> for
>> > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a
>> > > > regular
>> > > > > > >>>>> store,
>> > > > > > >>>>>>>>>>>> satisfying
>> > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
>> join.
>> > > The
>> > > > > > >>>>> Windowed
>> > > > > > >>>>>>>>>>>> store is
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> not
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull
>> > > > operation
>> > > > > > we
>> > > > > > >>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
>> have
>> > the
>> > > > > > window
>> > > > > > >>>>>>> store
>> > > > > > >>>>>>>>>>>> also
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> have
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> the values then.
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom
>> > group
>> > > > by
>> > > > > > >>>>> before
>> > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i
>> > think
>> > > > it
>> > > > > > >> would
>> > > > > > >>>>>>> help
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> users
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the
>> > > > original
>> > > > > > >>>>> primary
>> > > > > > >>>>>>>>> key
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> issue I
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid
>> foundation
>> > > to
>> > > > > > build
>> > > > > > >>>>> on.
>> > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user.
>> > very
>> > > > > > >>>>>>> unfortunate. I
>> > > > > > >>>>>>>>>>>> could
>> > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do
>> not
>> > > > think
>> > > > > > its
>> > > > > > >>>>> a
>> > > > > > >>>>>>> good
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> decision.
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> Thanks
>> > > > > > >>>>>>>>>>>>>>>>>>> Adam
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta
>> > > Dumbre <
>> > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
>> > > > > > >>>>>>> dumbreprajakta...@gmail.com
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           please remove me from this group
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM
>> Jan
>> > > > > Filipiak
>> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> <mailto:
>> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > Hi Adam,
>> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > give me some time, will make
>> such a
>> > > > > chart.
>> > > > > > >> last
>> > > > > > >>>>>>> time i
>> > > > > > >>>>>>>>>>>> didn't
>> > > > > > >>>>>>>>>>>>>>>>>>>           get along
>> > > > > > >>>>>>>>>>>>>>>>>>>           > well with giphy and ruined all
>> your
>> > > > > charts.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done
>> today
>> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam
>> Bellemare
>> > > > > wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > Hi Jan
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > I have included a diagram of
>> > what I
>> > > > > > >> attempted
>> > > > > > >>>>> on
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> KIP.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>
>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>> > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>> > > > > > >>>>>>>>>>>>>>>>>>>           <
>> > > > > > >>>>>>>>>>>>
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>
>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>> > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > I attempted this back at the
>> > start
>> > > of
>> > > > > my
>> > > > > > own
>> > > > > > >>>>>>>>>>>> implementation
>> > > > > > >>>>>>>>>>>>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>           this
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > solution, and since I could
>> not
>> > get
>> > > > it
>> > > > > to
>> > > > > > >>>>> work I
>> > > > > > >>>>>>> have
>> > > > > > >>>>>>>>>>>> since
>> > > > > > >>>>>>>>>>>>>>>>>>>           discarded the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > code. At this point in time,
>> if
>> > you
>> > > > > wish
>> > > > > > to
>> > > > > > >>>>>>> continue
>> > > > > > >>>>>>>>>>>> pursuing
>> > > > > > >>>>>>>>>>>>>>>>>>>           for your
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask that
>> you
>> > > > please
>> > > > > > >>>>> create a
>> > > > > > >>>>>>>>>>>> diagram on
>> > > > > > >>>>>>>>>>>>>>>>>>>           the KIP
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > carefully explaining your
>> > solution.
>> > > > > > Please
>> > > > > > >>>>> feel
>> > > > > > >>>>>>> free
>> > > > > > >>>>>>>>> to
>> > > > > > >>>>>>>>>>>> use
>> > > > > > >>>>>>>>>>>>>>>>>>>           the image I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > just posted as a starting
>> point.
>> > I
>> > > am
>> > > > > > having
>> > > > > > >>>>>>> trouble
>> > > > > > >>>>>>>>>>>>>>>>>>>           understanding your
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > explanations but I think that
>> a
>> > > > > carefully
>> > > > > > >>>>>>> constructed
>> > > > > > >>>>>>>>>>>> diagram
>> > > > > > >>>>>>>>>>>>>>>>>>>           will clear
>> > > > > > >>>>>>>>>>>>>>>>>>>           > up
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > any misunderstandings.
>> > Alternately,
>> > > > > > please
>> > > > > > >>>>> post a
>> > > > > > >>>>>>>>>>>>>>>>>>>           comprehensive PR with
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > your solution. I can only
>> guess
>> > at
>> > > > what
>> > > > > > you
>> > > > > > >>>>>>> mean, and
>> > > > > > >>>>>>>>>>>> since I
>> > > > > > >>>>>>>>>>>>>>>>>>>           value my
>> > > > > > >>>>>>>>>>>>>>>>>>>           > own
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > time as much as you value
>> yours,
>> > I
>> > > > > > believe
>> > > > > > >> it
>> > > > > > >>>>> is
>> > > > > > >>>>>>> your
>> > > > > > >>>>>>>>>>>>>>>>>>>           responsibility to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > provide an implementation
>> instead
>> > > of
>> > > > me
>> > > > > > >>>>> trying to
>> > > > > > >>>>>>>>> guess.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > Adam
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00
>> AM,
>> > Jan
>> > > > > > Filipiak
>> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> <mailto:
>> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > > wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Hi James,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing
>> > interested.
>> > > > > kafka
>> > > > > > >>>>>>> streams at
>> > > > > > >>>>>>>>>>>> this
>> > > > > > >>>>>>>>>>>>>>>>>>>           point supports
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as long as
>> > both
>> > > > > > streams
>> > > > > > >>>>> have
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> same
>> > > > > > >>>>>>>>>>>>>>>>>>> key.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Adam is currently
>> implementing a
>> > > > join
>> > > > > > >> where a
>> > > > > > >>>>>>> KTable
>> > > > > > >>>>>>>>>>>> and a
>> > > > > > >>>>>>>>>>>>>>>>>>>           KTable can
>> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> a one to many relation ship
>> > (1:n).
>> > > > We
>> > > > > > >> exploit
>> > > > > > >>>>>>> that
>> > > > > > >>>>>>>>>>>> rocksdb
>> > > > > > >>>>>>>>>>>>>>>>>>> is
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> a
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > >> datastore that keeps data
>> sorted
>> > (At
>> > > > > least
>> > > > > > >>>>>>> exposes an
>> > > > > > >>>>>>>>>>>> API to
>> > > > > > >>>>>>>>>>>>>>>>>>>           access the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted
>> > fashion).
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I think the technical caveats
>> > are
>> > > > well
>> > > > > > >>>>>>> understood
>> > > > > > >>>>>>>>> now
>> > > > > > >>>>>>>>>>>> and we
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> are
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > basically
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API
>> > Design
>> > > (
>> > > > > when
>> > > > > > >> Adam
>> > > > > > >>>>>>> sees
>> > > > > > >>>>>>>>> my
>> > > > > > >>>>>>>>>>>> newest
>> > > > > > >>>>>>>>>>>>>>>>>>>           message).
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track
>> record of
>> > > > > loosing
>> > > > > > >>>>> those
>> > > > > > >>>>>>> kinda
>> > > > > > >>>>>>>>>>>>>>>>>>>           arguments within
>> > > > > > >>>>>>>>>>>>>>>>>>>           > the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> streams community and I have
>> no
>> > > clue
>> > > > > > why.
>> > > > > > >> So
>> > > > > > >>>>> I
>> > > > > > >>>>>>>>>>>> literally
>> > > > > > >>>>>>>>>>>>>>>>>>>           can't wait for
>> > > > > > >>>>>>>>>>>>>>>>>>>           > you
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> to churn through this thread
>> and
>> > > > give
>> > > > > > you
>> > > > > > >>>>>>> opinion on
>> > > > > > >>>>>>>>>>>> how we
>> > > > > > >>>>>>>>>>>>>>>>>>>           should
>> > > > > > >>>>>>>>>>>>>>>>>>>           > design
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the return type of the
>> > > oneToManyJoin
>> > > > > and
>> > > > > > >> how
>> > > > > > >>>>>>> many
>> > > > > > >>>>>>>>>>>> power we
>> > > > > > >>>>>>>>>>>>>>>>>>>           want to give
>> > > > > > >>>>>>>>>>>>>>>>>>>           > to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity"
>> (where
>> > > > > > simplicity
>> > > > > > >>>>> isn't
>> > > > > > >>>>>>>>>>>> really that
>> > > > > > >>>>>>>>>>>>>>>>>>>           as users
>> > > > > > >>>>>>>>>>>>>>>>>>>           > still
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> need to understand it I
>> argue)
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> waiting for you to join in on
>> > the
>> > > > > > >> discussion
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Best Jan
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James
>> Kwan
>> > > > wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I am new to this group and I
>> > > found
>> > > > > this
>> > > > > > >>>>> subject
>> > > > > > >>>>>>>>>>>>>>>>>>>           interesting.  Sounds
>> > > > > > >>>>>>>>>>>>>>>>>>>           > like
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> you guys want to implement a
>> > join
>> > > > > > table of
>> > > > > > >>>>> two
>> > > > > > >>>>>>>>>>>> streams? Is
>> > > > > > >>>>>>>>>>>>>>>>>>> there
>> > > > > > >>>>>>>>>>>>>>>>>>>           > somewhere
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I can see the original
>> > > requirement
>> > > > or
>> > > > > > >>>>> proposal?
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13 AM,
>> Jan
>> > > > > > Filipiak
>> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
>> <mailto:
>> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17, Adam
>> > > > Bellemare
>> > > > > > >> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing
>> using a
>> > > > > > Windowed
>> > > > > > >>>>> Store
>> > > > > > >>>>>>> to
>> > > > > > >>>>>>>>>>>> store the
>> > > > > > >>>>>>>>>>>>>>>>>>>           highwater
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> mark.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this
>> > should
>> > > > work
>> > > > > > >> fine,
>> > > > > > >>>>>>> with
>> > > > > > >>>>>>>>> the
>> > > > > > >>>>>>>>>>>> caveat
>> > > > > > >>>>>>>>>>>>>>>>>>>           being that
>> > > > > > >>>>>>>>>>>>>>>>>>>           > it
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> can
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> only resolve out-of-order
>> > > arrival
>> > > > > > for up
>> > > > > > >>>>> to
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> size of
>> > > > > > >>>>>>>>>>>>>>>>>>>           the window
>> > > > > > >>>>>>>>>>>>>>>>>>>           > (ie:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This would
>> > > remove
>> > > > > the
>> > > > > > >>>>>>> possibility
>> > > > > > >>>>>>>>>>>> of it
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> being
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > unbounded
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> in
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> size.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's
>> > > > suggestion, I
>> > > > > > >>>>> believe
>> > > > > > >>>>>>> this
>> > > > > > >>>>>>>>> is
>> > > > > > >>>>>>>>>>>> where
>> > > > > > >>>>>>>>>>>>>>>>>>>           we will
>> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement.
>> > While I
>> > > > do
>> > > > > > not
>> > > > > > >>>>>>> disagree
>> > > > > > >>>>>>>>>>>> with your
>> > > > > > >>>>>>>>>>>>>>>>>>>           statement
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> about
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be
>> additional
>> > > > joins
>> > > > > > done
>> > > > > > >>>>> in a
>> > > > > > >>>>>>>>>>>> real-world
>> > > > > > >>>>>>>>>>>>>>>>>>>           workflow, I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > do
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> not
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> see how you can
>> conclusively
>> > > deal
>> > > > > > with
>> > > > > > >>>>>>>>> out-of-order
>> > > > > > >>>>>>>>>>>>>>>>>>> arrival
>> > > > > > >>>>>>>>>>>>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent
>> > joins. I
>> > > > > have
>> > > > > > >>>>>>> attempted
>> > > > > > >>>>>>>>> what
>> > > > > > >>>>>>>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>>>>           think you have
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a
>> > high-water,
>> > > > > using
>> > > > > > >>>>>>> groupBy and
>> > > > > > >>>>>>>>>>>> reduce)
>> > > > > > >>>>>>>>>>>>>>>>>>>           and found
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> that if
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes
>> too
>> > > > > quickly,
>> > > > > > or
>> > > > > > >>>>> the
>> > > > > > >>>>>>> load
>> > > > > > >>>>>>>>> on
>> > > > > > >>>>>>>>>>>> a
>> > > > > > >>>>>>>>>>>>>>>>>>>           stream thread
>> > > > > > >>>>>>>>>>>>>>>>>>>           > is
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> too
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined messages
>> > will
>> > > > > arrive
>> > > > > > >>>>>>>>> out-of-order
>> > > > > > >>>>>>>>>>>> and be
>> > > > > > >>>>>>>>>>>>>>>>>>>           incorrectly
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that an
>> > > > > intermediate
>> > > > > > >>>>> event
>> > > > > > >>>>>>> is
>> > > > > > >>>>>>>>>>>>>>>>>>> represented
>> > > > > > >>>>>>>>>>>>>>>>>>>           as the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > final
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> event.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light on
>> > your
>> > > > > > groupBy
>> > > > > > >>>>>>>>>>>> implementation.
>> > > > > > >>>>>>>>>>>>>>>>>>>           There must be
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion where it
>> > is,
>> > > I
>> > > > > > would
>> > > > > > >>>>> just
>> > > > > > >>>>>>> like
>> > > > > > >>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           confirm. The idea
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it
>> must be
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess up.
>> I
>> > > would
>> > > > > > like
>> > > > > > >> to
>> > > > > > >>>>>>> clarify
>> > > > > > >>>>>>>>>>>> before
>> > > > > > >>>>>>>>>>>>>>>>>>>           we draw a
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the
>> > scattered
>> > > > > events
>> > > > > > >>>>> back to
>> > > > > > >>>>>>>>> their
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> original
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > >>>>> partitions is the only way I
>> > know
>> > > > how
>> > > > > > to
>> > > > > > >>>>>>>>> conclusively
>> > > > > > >>>>>>>>>>>> deal
>> > > > > > >>>>>>>>>>>>>>>>>>>           with
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in a
>> > given
>> > > > time
>> > > > > > >> frame,
>> > > > > > >>>>>>> and to
>> > > > > > >>>>>>>>>>>> ensure
>> > > > > > >>>>>>>>>>>>>>>>>>>           that the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > data
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> is
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent with
>> > the
>> > > > > input
>> > > > > > >>>>> events.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code to
>> > share
>> > > > that
>> > > > > > >>>>>>> illustrates
>> > > > > > >>>>>>>>> your
>> > > > > > >>>>>>>>>>>>>>>>>>>           approach, I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > would
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> be
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it would
>> > > remove
>> > > > > any
>> > > > > > >>>>>>>>>>>> misunderstandings
>> > > > > > >>>>>>>>>>>>>>>>>>>           that I may
>> > > > > > >>>>>>>>>>>>>>>>>>>           > have.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking
>> for
>> > my
>> > > > > code.
>> > > > > > I
>> > > > > > >>>>> don't
>> > > > > > >>>>>>> have
>> > > > > > >>>>>>>>>>>>>>>>>>>           something easily
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> readable here as its
>> bloated
>> > > with
>> > > > > > >>>>> OO-patterns.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> @Override
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K
>> aggKey,
>> > V
>> > > > > > value, T
>> > > > > > >>>>>>>>> aggregate)
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      {
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V>
>> > > > > currentStateAsMap =
>> > > > > > >>>>>>>>>>>> asMap(aggregate);
>> > > > > > >>>>>>>>>>>>>>>>>>> <<
>> > > > > > >>>>>>>>>>>>>>>>>>>           imaginary
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
>> > > > > > >>>>> mapper.apply(value);
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << this is the
>> > > place
>> > > > > > where
>> > > > > > >>>>> people
>> > > > > > >>>>>>>>>>>> actually
>> > > > > > >>>>>>>>>>>>>>>>>>>           gonna have
>> > > > > > >>>>>>>>>>>>>>>>>>>           > issues
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and why you probably
>> couldn't
>> > do
>> > > > it.
>> > > > > > we
>> > > > > > >>>>> would
>> > > > > > >>>>>>> need
>> > > > > > >>>>>>>>>>>> to find
>> > > > > > >>>>>>>>>>>>>>>>>>>           a solution
>> > > > > > >>>>>>>>>>>>>>>>>>>           > here.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that yet.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << we
>> propagate
>> > the
>> > > > > > field in
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>>>>> joiner, so
>> > > > > > >>>>>>>>>>>>>>>>>>>           that we can
>> > > > > > >>>>>>>>>>>>>>>>>>>           > pick
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate.
>> > Probably
>> > > > you
>> > > > > > have
>> > > > > > >>>>> not
>> > > > > > >>>>>>>>> thought
>> > > > > > >>>>>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>           this in your
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> approach right?
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I am very
>> open
>> > > to
>> > > > > > find a
>> > > > > > >>>>>>> generic
>> > > > > > >>>>>>>>>>>> solution
>> > > > > > >>>>>>>>>>>>>>>>>>>           here. In my
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is
>> broken
>> > in
>> > > > > > >>>>>>>>> KTableImpl.GroupBy
>> > > > > > >>>>>>>>>>>> that
>> > > > > > >>>>>>>>>>>>>>>>>>> it
>> > > > > > >>>>>>>>>>>>>>>>>>>           looses
>> > > > > > >>>>>>>>>>>>>>>>>>>           > the keys
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the
>> > aggregate
>> > > > > key.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I
>> abstracted
>> > it
>> > > > away
>> > > > > > back
>> > > > > > >>>>>>> then way
>> > > > > > >>>>>>>>>>>> before
>> > > > > > >>>>>>>>>>>>>>>>>>> i
>> > > > > > >>>>>>>>>>>>>>>>>>> was
>> > > > > > >>>>>>>>>>>>>>>>>>>           > thinking
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That is
>> > why I
>> > > > > > didn't
>> > > > > > >>>>>>> realize
>> > > > > > >>>>>>>>> its
>> > > > > > >>>>>>>>>>>>>>>>>>>           significance here.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << Opinions?
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          for (V m :
>> current)
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > currentStateAsMap.put(mapper.apply(m),
>> > > > > > >> m);
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > currentStateAsMap.put(toModifyKey,
>> > > > > > >> value);
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          else
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > currentStateAsMap.remove(toModifyKey);
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > if(currentStateAsMap.isEmpty()){
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>                  return
>> null;
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              }
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          retrun
>> > > > > > >>>>>>> asAggregateType(currentStateAsMap)
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      }
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Adam
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at
>> 3:35
>> > PM,
>> > > > Jan
>> > > > > > >>>>> Filipiak
>> > > > > > >>>>>>> <
>> > > > > > >>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com
>> <mailto:
>> > > > > > >>>>>>>>> jan.filip...@trivago.com
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for bringing
>> > > Matthias
>> > > > > to
>> > > > > > >>>>> speed!
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> about the differences. I
>> > think
>> > > > > > >> re-keying
>> > > > > > >>>>>>> back
>> > > > > > >>>>>>>>>>>> should be
>> > > > > > >>>>>>>>>>>>>>>>>>>           optional at
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> best.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return a
>> > > > > > KScatteredTable
>> > > > > > >>>>> with
>> > > > > > >>>>>>>>>>>> reshuffle()
>> > > > > > >>>>>>>>>>>>>>>>>>>           returning
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> KTable<originalKey,Joined>
>> > to
>> > > > make
>> > > > > > the
>> > > > > > >>>>>>> backwards
>> > > > > > >>>>>>>>>>>>>>>>>>>           repartitioning
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big
>> favour of
>> > > > doing
>> > > > > > the
>> > > > > > >>>>> out
>> > > > > > >>>>>>> of
>> > > > > > >>>>>>>>> order
>> > > > > > >>>>>>>>>>>>>>>>>>>           processing using
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> group
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water
>> mark
>> > > > > tracking.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded
>> > growth
>> > > is
>> > > > > > just
>> > > > > > >>>>> scary
>> > > > > > >>>>>>> + It
>> > > > > > >>>>>>>>>>>> saves
>> > > > > > >>>>>>>>>>>>>>>>>>> us
>> > > > > > >>>>>>>>>>>>>>>>>>>           the header
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I think the abstraction
>> of
>> > > > always
>> > > > > > >>>>>>> repartitioning
>> > > > > > >>>>>>>>>>>> back is
>> > > > > > >>>>>>>>>>>>>>>>>>>           just not so
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work has
>> > been
>> > > > > done
>> > > > > > >>>>> before
>> > > > > > >>>>>>> we
>> > > > > > >>>>>>>>>>>> partition
>> > > > > > >>>>>>>>>>>>>>>>>>>           back and
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by something else
>> afterwards
>> > > is
>> > > > > > really
>> > > > > > >>>>>>> common.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49, Adam
>> > > > > Bellemare
>> > > > > > >>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your
>> > feedback,
>> > > I
>> > > > do
>> > > > > > >>>>>>> appreciate
>> > > > > > >>>>>>>>> it!
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing
>> would be
>> > > > > > possible,
>> > > > > > >> it
>> > > > > > >>>>>>> would
>> > > > > > >>>>>>>>>>>> require
>> > > > > > >>>>>>>>>>>>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > deserialize
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
>> implies
>> > a
>> > > > > > runtime
>> > > > > > >>>>>>> overhead.
>> > > > > > >>>>>>>>> I
>> > > > > > >>>>>>>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > no
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
>> avoid
>> > > the
>> > > > > > >>>>> overhead.
>> > > > > > >>>>>>> If
>> > > > > > >>>>>>>>> this
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> becomes a
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > problem in
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
>> still
>> > add
>> > > > > name
>> > > > > > >>>>> spacing
>> > > > > > >>>>>>>>> later
>> > > > > > >>>>>>>>>>>> on.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go with
>> > > using a
>> > > > > > >> reserved
>> > > > > > >>>>>>> string
>> > > > > > >>>>>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>>>>           document it.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about
>> the
>> > > > design
>> > > > > it
>> > > > > > >> the
>> > > > > > >>>>>>> type of
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > If
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the proposal
>> > > > > correctly,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you
>> have
>> > > > table1
>> > > > > > and
>> > > > > > >>>>> table2
>> > > > > > >>>>>>>>>>>> swapped.
>> > > > > > >>>>>>>>>>>>>>>>>>>           Here is how it
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> works
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the
>> records
>> > > that
>> > > > > > contain
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>>>>> foreign key
>> > > > > > >>>>>>>>>>>>>>>>>>>           within their
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream:
>> > > > > > <a,(fk=A,bar=1)>,
>> > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream:
>> <A,X>,
>> > > > <B,Y>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is
>> > required
>> > > > to
>> > > > > > >> extract
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>>>>> foreign
>> > > > > > >>>>>>>>>>>>>>>>>>> key.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key
>> mapper:
>> > (
>> > > > > value
>> > > > > > =>
>> > > > > > >>>>>>> value.fk
>> > > > > > >>>>>>>>>>>>>>>>>>>           <http://value.fk> )
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is applied to
>> > each
>> > > > > > element
>> > > > > > >> in
>> > > > > > >>>>>>>>> table1,
>> > > > > > >>>>>>>>>>>> and a
>> > > > > > >>>>>>>>>>>>>>>>>>>           new combined
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a,
>> > > > > (fk=A,bar=1)>,
>> > > > > > >>>>> <A-b,
>> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
>> > > > > > >>>>>>>>>>>>>>>>>>>           <B-c,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events
>> are
>> > > > > > >> copartitioned
>> > > > > > >>>>>>> with
>> > > > > > >>>>>>>>>>>> table2:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with
>> > > Partition
>> > > > > 0:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
>> <A-a,
>> > > > > > >>>>> (fk=A,bar=1)>,
>> > > > > > >>>>>>> <A-b,
>> > > > > > >>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with
>> > > Partition
>> > > > > 1:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
>> <B-c,
>> > > > > > >> (fk=B,bar=3)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they can
>> be
>> > > > joined
>> > > > > > >>>>> together
>> > > > > > >>>>>>>>> locally
>> > > > > > >>>>>>>>>>>> by
>> > > > > > >>>>>>>>>>>>>>>>>>>           applying the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's
>> design
>> > > and
>> > > > > my
>> > > > > > >>>>> design
>> > > > > > >>>>>>>>>>>> deviate. My
>> > > > > > >>>>>>>>>>>>>>>>>>>           design goes
>> > > > > > >>>>>>>>>>>>>>>>>>>           > on
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data
>> > > post-join
>> > > > > and
>> > > > > > >>>>> resolve
>> > > > > > >>>>>>>>>>>> out-of-order
>> > > > > > >>>>>>>>>>>>>>>>>>>           arrival of
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the
>> data
>> > > > keyed
>> > > > > > just
>> > > > > > >>>>> the
>> > > > > > >>>>>>>>>>>> original key.
>> > > > > > >>>>>>>>>>>>>>>>>>>           I do not
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of
>> the
>> > > > > internals
>> > > > > > >>>>>>> outside of
>> > > > > > >>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           joinOnForeignKey
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does make
>> > for
>> > > > > larger
>> > > > > > >>>>>>> footprint,
>> > > > > > >>>>>>>>>>>> but it
>> > > > > > >>>>>>>>>>>>>>>>>>>           removes all
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving
>> out-of-order
>> > > > > arrivals
>> > > > > > >> and
>> > > > > > >>>>>>>>> handling
>> > > > > > >>>>>>>>>>>>>>>>>>>           CombinedKeys from
>> > > > > > >>>>>>>>>>>>>>>>>>>           > the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that
>> this
>> > > makes
>> > > > > the
>> > > > > > >>>>>>> function
>> > > > > > >>>>>>>>> much
>> > > > > > >>>>>>>>>>>>>>>>>>> easier
>> > > > > > >>>>>>>>>>>>>>>>>>>           to use.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this
>> helps
>> > > > resolve
>> > > > > > your
>> > > > > > >>>>>>>>> questions,
>> > > > > > >>>>>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>>>>           please feel
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on
>> your
>> > > mind.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018 at
>> 8:36
>> > > PM,
>> > > > > > >>>>> Matthias J.
>> > > > > > >>>>>>>>> Sax <
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> matth...@confluent.io
>> > > <mailto:
>> > > > > > >>>>>>>>>>>> matth...@confluent.io>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching up
>> on
>> > > this
>> > > > > > >> thread. I
>> > > > > > >>>>>>> did
>> > > > > > >>>>>>>>> not
>> > > > > > >>>>>>>>>>>> read
>> > > > > > >>>>>>>>>>>>>>>>>>>           everything so
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share
>> couple
>> > of
>> > > > > > initial
>> > > > > > >>>>>>> thoughts:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think there
>> is
>> > a
>> > > > > > >> fundamental
>> > > > > > >>>>>>>>>>>> difference
>> > > > > > >>>>>>>>>>>>>>>>>>>           between header
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and KP-258.
>> > For
>> > > > 258,
>> > > > > > we
>> > > > > > >> add
>> > > > > > >>>>>>>>> headers
>> > > > > > >>>>>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka
>> Streams
>> > > and
>> > > > > > nobody
>> > > > > > >>>>>>> else is
>> > > > > > >>>>>>>>>>>> supposed
>> > > > > > >>>>>>>>>>>>>>>>>>>           to write
>> > > > > > >>>>>>>>>>>>>>>>>>>           > into
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no user
>> > > header
>> > > > > are
>> > > > > > >>>>> written
>> > > > > > >>>>>>> into
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not
>> > > conflicts.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I don't
>> see
>> > a
>> > > > big
>> > > > > > issue
>> > > > > > >>>>> with
>> > > > > > >>>>>>>>> using
>> > > > > > >>>>>>>>>>>>>>>>>>>           headers within
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we document
>> it,
>> > > we
>> > > > > can
>> > > > > > >> have
>> > > > > > >>>>>>> some
>> > > > > > >>>>>>>>>>>> "reserved"
>> > > > > > >>>>>>>>>>>>>>>>>>>           header keys
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not allowed
>> to
>> > use
>> > > > > when
>> > > > > > >>>>>>> processing
>> > > > > > >>>>>>>>>>>> data with
>> > > > > > >>>>>>>>>>>>>>>>>>>           Kafka
>> > > > > > >>>>>>>>>>>>>>>>>>>           > Streams.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be
>> ok.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a safe
>> > way
>> > > to
>> > > > > > avoid
>> > > > > > >>>>>>>>> conflicts,
>> > > > > > >>>>>>>>>>>> since
>> > > > > > >>>>>>>>>>>>>>>>>>> these
>> > > > > > >>>>>>>>>>>>>>>>>>>           > headers
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in
>> internal
>> > > > topics
>> > > > > (I
>> > > > > > >>>>> think):
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and
>> > changelog
>> > > > > > topics,
>> > > > > > >> we
>> > > > > > >>>>> can
>> > > > > > >>>>>>>>>>>> namespace
>> > > > > > >>>>>>>>>>>>>>>>>>>           all headers:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined headers
>> > are
>> > > > > > >> namespaced
>> > > > > > >>>>> as
>> > > > > > >>>>>>>>>>>> "external."
>> > > > > > >>>>>>>>>>>>>>>>>>> +
>> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers are
>> > > > > > namespaced as
>> > > > > > >>>>>>>>>>>> "internal." +
>> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing
>> would
>> > be
>> > > > > > >> possible,
>> > > > > > >>>>> it
>> > > > > > >>>>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>>>> require
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
>> implies
>> > a
>> > > > > > runtime
>> > > > > > >>>>>>> overhead.
>> > > > > > >>>>>>>>> I
>> > > > > > >>>>>>>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > no
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
>> avoid
>> > > the
>> > > > > > >>>>> overhead.
>> > > > > > >>>>>>> If
>> > > > > > >>>>>>>>> this
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> becomes a
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > problem in
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
>> still
>> > add
>> > > > > name
>> > > > > > >>>>> spacing
>> > > > > > >>>>>>>>> later
>> > > > > > >>>>>>>>>>>> on.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern about
>> the
>> > > > design
>> > > > > > it
>> > > > > > >> the
>> > > > > > >>>>>>> type
>> > > > > > >>>>>>>>> of
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the proposal
>> > > > > correctly,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1> table1 =
>> ...
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2> table2 =
>> ...
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3>
>> joinedTable
>> > =
>> > > > > > >>>>>>>>>>>> table1.join(table2,...);
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the
>> > > `joinedTable`
>> > > > > has
>> > > > > > >> the
>> > > > > > >>>>>>> same
>> > > > > > >>>>>>>>> key
>> > > > > > >>>>>>>>>>>> as the
>> > > > > > >>>>>>>>>>>>>>>>>>>           left input
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not
>> work
>> > > > because
>> > > > > > if
>> > > > > > >>>>> table2
>> > > > > > >>>>>>>>>>>> contains
>> > > > > > >>>>>>>>>>>>>>>>>>>           multiple rows
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record in
>> > table1
>> > > > > > (what is
>> > > > > > >>>>> the
>> > > > > > >>>>>>> main
>> > > > > > >>>>>>>>>>>> purpose
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> a
>> > > > > > >>>>>>>>>>>>>>>>>>>           > foreign
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result table
>> > > would
>> > > > > only
>> > > > > > >>>>>>> contain a
>> > > > > > >>>>>>>>>>>> single
>> > > > > > >>>>>>>>>>>>>>>>>>>           join result,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > but
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream:
>> <A,X>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream:
>> > > > <a,(A,1)>,
>> > > > > > >>>>> <b,(A,2)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value a
>> > > foreign
>> > > > > key
>> > > > > > to
>> > > > > > >>>>>>> table1
>> > > > > > >>>>>>>>> key
>> > > > > > >>>>>>>>>>>> (ie,
>> > > > > > >>>>>>>>>>>>>>>>>>>           "A" joins).
>> > > > > > >>>>>>>>>>>>>>>>>>>           > If
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the same
>> key
>> > > as
>> > > > > key
>> > > > > > of
>> > > > > > >>>>>>> table1,
>> > > > > > >>>>>>>>> this
>> > > > > > >>>>>>>>>>>>>>>>>>>           implies that the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be
>> <A,
>> > > > > > join(X,1)>
>> > > > > > >> or
>> > > > > > >>>>> <A,
>> > > > > > >>>>>>>>>>>> join(X,2)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           but not
>> > > > > > >>>>>>>>>>>>>>>>>>>           > both.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share the
>> same
>> > > > key,
>> > > > > > >>>>> whatever
>> > > > > > >>>>>>>>> result
>> > > > > > >>>>>>>>>>>> record
>> > > > > > >>>>>>>>>>>>>>>>>>>           we emit
>> > > > > > >>>>>>>>>>>>>>>>>>>           > later,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the previous
>> > > result.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason why
>> Jan
>> > > > > > originally
>> > > > > > >>>>>>> proposed
>> > > > > > >>>>>>>>>>>> to use
>> > > > > > >>>>>>>>>>>>>>>>>>> a
>> > > > > > >>>>>>>>>>>>>>>>>>>           > combination
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of
>> the
>> > > input
>> > > > > > tables
>> > > > > > >>>>> as
>> > > > > > >>>>>>> key
>> > > > > > >>>>>>>>> of
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           output table.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of the
>> > output
>> > > > > table
>> > > > > > >>>>> unique
>> > > > > > >>>>>>> and
>> > > > > > >>>>>>>>> we
>> > > > > > >>>>>>>>>>>> can
>> > > > > > >>>>>>>>>>>>>>>>>>>           store both in
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be <A-a,
>> > > > > join(X,1)>,
>> > > > > > >> <A-b,
>> > > > > > >>>>>>>>>>>> join(X,2)>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM, Jan
>> > > > Filipiak
>> > > > > > >> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark
>> could
>> > be
>> > > > > > >>>>> disregarded.
>> > > > > > >>>>>>> The
>> > > > > > >>>>>>>>>>>> decision
>> > > > > > >>>>>>>>>>>>>>>>>>>           about the
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the size of
>> > the
>> > > > > > >> aggregated
>> > > > > > >>>>>>> map.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long
>> maps
>> > > > would
>> > > > > be
>> > > > > > >>>>>>> unpacked
>> > > > > > >>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>>>>           forwarded. 0
>> > > > > > >>>>>>>>>>>>>>>>>>>           > element
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published as
>> > > delete.
>> > > > > Any
>> > > > > > >>>>> other
>> > > > > > >>>>>>> count
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is in
>> > > "waiting
>> > > > > for
>> > > > > > >>>>> correct
>> > > > > > >>>>>>>>>>>> deletes to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > arrive"-state.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018 21:29,
>> Adam
>> > > > > > Bellemare
>> > > > > > >>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I
>> could
>> > > > > replace
>> > > > > > >> the
>> > > > > > >>>>>>> second
>> > > > > > >>>>>>>>>>>>>>>>>>>           repartition store
>> > > > > > >>>>>>>>>>>>>>>>>>>           > and
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store with
>> a
>> > > > groupBy
>> > > > > > and
>> > > > > > >>>>>>> reduce.
>> > > > > > >>>>>>>>>>>> However,
>> > > > > > >>>>>>>>>>>>>>>>>>>           it looks
>> > > > > > >>>>>>>>>>>>>>>>>>>           > like
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to store
>> the
>> > > > > > highwater
>> > > > > > >>>>> value
>> > > > > > >>>>>>>>> within
>> > > > > > >>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           materialized
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the arrival of
>> > > > > > out-of-order
>> > > > > > >>>>>>> records
>> > > > > > >>>>>>>>>>>> (assuming
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>> my
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is correct...).
>> This
>> > > in
>> > > > > > effect
>> > > > > > >> is
>> > > > > > >>>>>>> the
>> > > > > > >>>>>>>>> same
>> > > > > > >>>>>>>>>>>> as
>> > > > > > >>>>>>>>>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>>>>>>           design I
>> > > > > > >>>>>>>>>>>>>>>>>>>           > have
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two
>> tables
>> > > > merged
>> > > > > > >>>>> together.
>> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> > > > > > >>>>>>>>>>>>>>>>>>>           >
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> --
>> > > > > > >>>>>>>>>>>>>>>>> -- Guozhang
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> --
>> > > > > > >>>>>>>>>>>>>>>> -- Guozhang
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>

-- 
-- Guozhang

Reply via email to