John,

Thanks a lot for the suggestions on refactoring the wiki, I agree with you
that we should consider the KIP proposal to be easily understood by anyone
in the future to read, and hence should provide a good summary on the
user-facing interfaces, as well as rejected alternatives to represent
briefly "how we came a long way to this conclusion, and what we have
argued, disagreed, and agreed about, etc" so that readers do not need to
dig into the DISCUSS thread to get all the details. We can, of course, keep
the implementation details like "workflows" on the wiki page as a addendum
section since it also has correlations.

Regarding your proposal on comment 6): that's a very interesting idea! Just
to clarify that I understands it fully correctly: the proposal's resulted
topology is still the same as the current proposal, where we will have 3
sub-topologies for this operator:

#1: rekey left table
   -> source from the left upstream, send to rekey-processor to generate
combined key, and then sink to copartition topic.

#2: first-join with right table
   -> source from the right table upstream, materialize the right table.
   -> source from the co-partition topic, materialize the rekeyed left
table, join with the right table, rekey back, and then sink to the
rekeyed-back topic.

#3: second join
   -> source from the rekeyed-back topic, materialize the rekeyed back
table.
   -> source from the left upstream, materialize the left table, join with
the rekeyed back table.

Sub-topology #1 and #3 may be merged to a single sub-topology since both of
them read from the left table source stream. In this workflow, we need to
materialize 4 tables (left table in #3, right table in #2, rekeyed left
table in #2, rekeyed-back table in #3), and 2 repartition topics
(copartition topic, rekeyed-back topic).

Compared with Adam's current proposal in the workflow overview, it has the
same num.materialize tables (left table, rekeyed left table, right table,
out-of-ordering resolver table), and same num.internal topics (two). The
advantage is that on the copartition topic, we can save bandwidth by not
sending value, and in #2 the rekeyed left table is smaller since we do not
have any values to materialize. Is that right?


Guozhang



On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io> wrote:

> Hi Adam,
>
> Given that the committers are all pretty busy right now, I think that it
> would help if you were to refactor the KIP a little to reduce the workload
> for reviewers.
>
> I'd recommend the following changes:
> * relocate all internal details to a section at the end called something
> like "Implementation Notes" or something like that.
> * rewrite the rest of the KIP to be a succinct as possible and mention only
> publicly-facing API changes.
> ** for example, the interface that you've already listed there, as well as
> a textual description of the guarantees we'll be providing (join result is
> copartitioned with the LHS, and the join result is guaranteed correct)
>
> A good target would be that the whole main body of the KIP, including
> Status, Motivation, Proposal, Justification, and Rejected Alternatives all
> fit "above the fold" (i.e., all fit on the screen at a comfortable zoom
> level).
> I think the only real Rejected Alternative that bears mention at this point
> is KScatteredTable, which you could just include the executive summary on
> (no implementation details), and link to extra details in the
> Implementation Notes section.
>
> Taking a look at the wiki page, ~90% of the text there is internal detail,
> which is useful for the dubious, but doesn't need to be ratified in a vote
> (and would be subject to change without notice in the future anyway).
> There's also a lot of conflicting discussion, as you've very respectfully
> tried to preserve the original proposal from Jan while adding your own.
> Isolating all this information in a dedicated section at the bottom frees
> the voters up to focus on the public API part of the proposal, which is
> really all they need to consider.
>
> Plus, it'll be clear to future readers which parts of the document are
> enduring, and which parts are a snapshot of our implementation thinking at
> the time.
>
> I'm suggesting this because I suspect that the others haven't made time to
> review it partly because it seems daunting. If it seems like it would be a
> huge time investment to review, people will just keep putting it off. But
> if the KIP is a single page, then they'll be more inclined to give it a
> read.
>
> Honestly, I don't think the KIP itself is that controversial (apart from
> the scattered table thing (sorry, Jan) ). Most of the discussion has been
> around the implementation, which we can continue more effectively in a PR
> once the KIP has passed.
>
> How does that sound?
> -John
>
> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > 1) I believe that the resolution mechanism John has proposed is
> sufficient
> > - it is clean and easy and doesn't require additional RocksDB stores,
> which
> > reduces the footprint greatly. I don't think we need to resolve based on
> > timestamp or offset anymore, but if we decide to do to that would be
> within
> > the bounds of the existing API.
> >
> > 2) Is the current API sufficient, or does it need to be altered to go
> back
> > to vote?
> >
> > 3) KScatteredTable implementation can always be added in a future
> revision.
> > This API does not rule it out. This implementation of this function would
> > simply be replaced with `KScatteredTable.resolve()` while still
> maintaining
> > the existing API, thereby giving both features as Jan outlined earlier.
> > Would this work?
> >
> >
> > Thanks Guozhang, John and Jan
> >
> >
> >
> >
> > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io> wrote:
> >
> > > Hi, all,
> > >
> > > >> In fact, we
> > > >> can just keep a single final-result store with timestamps and reject
> > > values
> > > >> that have a smaller timestamp, is that right?
> > >
> > > > Which is the correct output should at least be decided on the offset
> of
> > > > the original message.
> > >
> > > Thanks for this point, Jan.
> > >
> > > KIP-258 is merely to allow embedding the record timestamp  in the k/v
> > > store,
> > > as well as providing a storage-format upgrade path.
> > >
> > > I might have missed it, but I think we have yet to discuss whether it's
> > > safe
> > > or desirable just to swap topic-ordering our for timestamp-ordering.
> This
> > > is
> > > a very deep topic, and I think it would only pollute the current
> > > discussion.
> > >
> > > What Adam has proposed is safe, given the *current* ordering semantics
> > > of the system. If we can agree on his proposal, I think we can merge
> the
> > > feature well before the conversation about timestamp ordering even
> takes
> > > place, much less reaches a conclusion. In the mean time, it would seem
> to
> > > be unfortunate to have one join operator with different ordering
> > semantics
> > > from every other KTable operator.
> > >
> > > If and when that timestamp discussion takes place, many (all?) KTable
> > > operations
> > > will need to be updated, rendering the many:one join a small marginal
> > cost.
> > >
> > > And, just to plug it again, I proposed an algorithm above that I
> believe
> > > provides
> > > correct ordering without any additional metadata, and regardless of the
> > > ordering semantics. I didn't bring it up further, because I felt the
> KIP
> > > only needs
> > > to agree on the public API, and we can discuss the implementation at
> > > leisure in
> > > a PR...
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <jan.filip...@trivago.com
> >
> > > wrote:
> > >
> > > >
> > > >
> > > > On 10.12.2018 07:42, Guozhang Wang wrote:
> > > > > Hello Adam / Jan / John,
> > > > >
> > > > > Sorry for being late on this thread! I've finally got some time
> this
> > > > > weekend to cleanup a load of tasks on my queue (actually I've also
> > > > realized
> > > > > there are a bunch of other things I need to enqueue while cleaning
> > them
> > > > up
> > > > > --- sth I need to improve on my side). So here are my thoughts:
> > > > >
> > > > > Regarding the APIs: I like the current written API in the KIP. More
> > > > > generally I'd prefer to keep the 1) one-to-many join
> functionalities
> > as
> > > > > well as 2) other join types than inner as separate KIPs since 1)
> may
> > > > worth
> > > > > a general API refactoring that can benefit not only foreignkey
> joins
> > > but
> > > > > collocate joins as well (e.g. an extended proposal of
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > > ),
> > > > > and I'm not sure if other join types would actually be needed
> (maybe
> > > left
> > > > > join still makes sense), so it's better to
> > > wait-for-people-to-ask-and-add
> > > > > than add-sth-that-no-one-uses.
> > > > >
> > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a
> > > > > KScatteredTable for users to inject their own optimization: I'd
> > prefer
> > > to
> > > > > do the current option as-is, and my main rationale is for
> > optimization
> > > > > rooms inside the Streams internals and the API succinctness. For
> > > advanced
> > > > > users who may indeed prefer KScatteredTable and do their own
> > > > optimization,
> > > > > while it is too much of the work to use Processor API directly, I
> > think
> > > > we
> > > > > can still extend the current API to support it in the future if it
> > > > becomes
> > > > > necessary.
> > > >
> > > > no internal optimization potential. it's a myth
> > > >
> > > > ¯\_(ツ)_/¯
> > > >
> > > > :-)
> > > >
> > > > >
> > > > > Another note about step 4) resolving out-of-ordering data, as I
> > > mentioned
> > > > > before I think with KIP-258 (embedded timestamp with key-value
> store)
> > > we
> > > > > can actually make this step simpler than the current proposal. In
> > fact,
> > > > we
> > > > > can just keep a single final-result store with timestamps and
> reject
> > > > values
> > > > > that have a smaller timestamp, is that right?
> > > >
> > > > Which is the correct output should at least be decided on the offset
> of
> > > > the original message.
> > > >
> > > > >
> > > > >
> > > > > That's all I have in mind now. Again, great appreciation to Adam to
> > > make
> > > > > such HUGE progress on this KIP!
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> > jan.filip...@trivago.com>
> > > > > wrote:
> > > > >
> > > > >> If they don't find the time:
> > > > >> They usually take the opposite path from me :D
> > > > >> so the answer would be clear.
> > > > >>
> > > > >> hence my suggestion to vote.
> > > > >>
> > > > >>
> > > > >> On 04.12.2018 21:06, Adam Bellemare wrote:
> > > > >>> Hi Guozhang and Matthias
> > > > >>>
> > > > >>> I know both of you are quite busy, but we've gotten this KIP to a
> > > point
> > > > >>> where we need more guidance on the API (perhaps a bit of a
> > > tie-breaker,
> > > > >> if
> > > > >>> you will). If you have anyone else you may think should look at
> > this,
> > > > >>> please tag them accordingly.
> > > > >>>
> > > > >>> The scenario is as such:
> > > > >>>
> > > > >>> Current Option:
> > > > >>> API:
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the
> partition
> > > with
> > > > >> the
> > > > >>> foreignKey (repartition 1)
> > > > >>> 2) Join the data
> > > > >>> 3) Shuffle the data back to the original node (repartition 2)
> > > > >>> 4) Resolve out-of-order arrival / race condition due to
> foreign-key
> > > > >> changes.
> > > > >>>
> > > > >>> Alternate Option:
> > > > >>> Perform #1 and #2 above, and return a KScatteredTable.
> > > > >>> - It would be keyed on a wrapped key function: <CombinedKey<KO,
> K>,
> > > VR>
> > > > >> (KO
> > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result)
> > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but
> otherwise a
> > > > user
> > > > >>> would be able to perform additional functions directly from the
> > > > >>> KScatteredTable (TBD - currently out of scope).
> > > > >>> - John's analysis 2-emails up is accurate as to the tradeoffs.
> > > > >>>
> > > > >>> Current Option is coded as-is. Alternate option is possible, but
> > will
> > > > >>> require for implementation details to be made in the API and some
> > > > >> exposure
> > > > >>> of new data structures into the API (ie: CombinedKey).
> > > > >>>
> > > > >>> I appreciate any insight into this.
> > > > >>>
> > > > >>> Thanks.
> > > > >>>
> > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> > > > adam.bellem...@gmail.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hi John
> > > > >>>>
> > > > >>>> Thanks for your feedback and assistance. I think your summary is
> > > > >> accurate
> > > > >>>> from my perspective. Additionally, I would like to add that
> there
> > > is a
> > > > >> risk
> > > > >>>> of inconsistent final states without performing the resolution.
> > This
> > > > is
> > > > >> a
> > > > >>>> major concern for me as most of the data I have dealt with is
> > > produced
> > > > >> by
> > > > >>>> relational databases. We have seen a number of cases where a
> user
> > in
> > > > the
> > > > >>>> Rails UI has modified the field (foreign key), realized they
> made
> > a
> > > > >>>> mistake, and then updated the field again with a new key. The
> > events
> > > > are
> > > > >>>> propagated out as they are produced, and as such we have had
> > > > real-world
> > > > >>>> cases where these inconsistencies were propagated downstream as
> > the
> > > > >> final
> > > > >>>> values due to the race conditions in the fanout of the data.
> > > > >>>>
> > > > >>>> This solution that I propose values correctness of the final
> > result
> > > > over
> > > > >>>> other factors.
> > > > >>>>
> > > > >>>> We could always move this function over to using a
> KScatteredTable
> > > > >>>> implementation in the future, and simply deprecate it this join
> > API
> > > in
> > > > >>>> time. I think I would like to hear more from some of the other
> > major
> > > > >>>> committers on which course of action they would think is best
> > before
> > > > any
> > > > >>>> more coding is done.
> > > > >>>>
> > > > >>>> Thanks again
> > > > >>>>
> > > > >>>> Adam
> > > > >>>>
> > > > >>>>
> > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <j...@confluent.io>
> > > > wrote:
> > > > >>>>
> > > > >>>>> Hi Jan and Adam,
> > > > >>>>>
> > > > >>>>> Wow, thanks for doing that test, Adam. Those results are
> > > encouraging.
> > > > >>>>>
> > > > >>>>> Thanks for your performance experience as well, Jan. I agree
> that
> > > > >> avoiding
> > > > >>>>> unnecessary join outputs is especially important when the
> fan-out
> > > is
> > > > so
> > > > >>>>> high. I suppose this could also be built into the
> implementation
> > > > we're
> > > > >>>>> discussing, but it wouldn't have to be specified in the KIP
> > (since
> > > > >> it's an
> > > > >>>>> API-transparent optimization).
> > > > >>>>>
> > > > >>>>> As far as whether or not to re-repartition the data, I didn't
> > bring
> > > > it
> > > > >> up
> > > > >>>>> because it sounded like the two of you agreed to leave the KIP
> > > as-is,
> > > > >>>>> despite the disagreement.
> > > > >>>>>
> > > > >>>>> If you want my opinion, I feel like both approaches are
> > reasonable.
> > > > >>>>> It sounds like Jan values more the potential for developers to
> > > > optimize
> > > > >>>>> their topologies to re-use the intermediate nodes, whereas Adam
> > > > places
> > > > >>>>> more
> > > > >>>>> value on having a single operator that people can use without
> > extra
> > > > >> steps
> > > > >>>>> at the end.
> > > > >>>>>
> > > > >>>>> Personally, although I do find it exceptionally annoying when a
> > > > >> framework
> > > > >>>>> gets in my way when I'm trying to optimize something, it seems
> > > better
> > > > >> to
> > > > >>>>> go
> > > > >>>>> for a single operation.
> > > > >>>>> * Encapsulating the internal transitions gives us significant
> > > > latitude
> > > > >> in
> > > > >>>>> the implementation (for example, joining only at the end, not
> in
> > > the
> > > > >>>>> middle
> > > > >>>>> to avoid extra data copying and out-of-order resolution; how we
> > > > >> represent
> > > > >>>>> the first repartition keys (combined keys vs. value vectors),
> > > etc.).
> > > > >> If we
> > > > >>>>> publish something like a KScatteredTable with the
> > right-partitioned
> > > > >> joined
> > > > >>>>> data, then the API pretty much locks in the implementation as
> > well.
> > > > >>>>> * The API seems simpler to understand and use. I do mean
> "seems";
> > > if
> > > > >>>>> anyone
> > > > >>>>> wants to make the case that KScatteredTable is actually
> simpler,
> > I
> > > > >> think
> > > > >>>>> hypothetical usage code would help. From a relational algebra
> > > > >> perspective,
> > > > >>>>> it seems like KTable.join(KTable) should produce a new KTable
> in
> > > all
> > > > >>>>> cases.
> > > > >>>>> * That said, there might still be room in the API for a
> different
> > > > >>>>> operation
> > > > >>>>> like what Jan has proposed to scatter a KTable, and then do
> > things
> > > > like
> > > > >>>>> join, re-group, etc from there... I'm not sure; I haven't
> thought
> > > > >> through
> > > > >>>>> all the consequences yet.
> > > > >>>>>
> > > > >>>>> This is all just my opinion after thinking over the discussion
> so
> > > > >> far...
> > > > >>>>> -John
> > > > >>>>>
> > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> > > > >> adam.bellem...@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Updated the PR to take into account John's feedback.
> > > > >>>>>>
> > > > >>>>>> I did some preliminary testing for the performance of the
> > > > prefixScan.
> > > > >> I
> > > > >>>>>> have attached the file, but I will also include the text in
> the
> > > body
> > > > >>>>> here
> > > > >>>>>> for archival purposes (I am not sure what happens to attached
> > > > files).
> > > > >> I
> > > > >>>>>> also updated the PR and the KIP accordingly.
> > > > >>>>>>
> > > > >>>>>> Summary: It scales exceptionally well for scanning large
> values
> > of
> > > > >>>>>> records. As Jan mentioned previously, the real issue would be
> > more
> > > > >>>>> around
> > > > >>>>>> processing the resulting records after obtaining them. For
> > > instance,
> > > > >> it
> > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and a
> further
> > > > >>>>> ~35-85mS
> > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of them.
> > > Iterating
> > > > >>>>>> through the records just to generate a simple count takes ~ 40
> > > times
> > > > >>>>> longer
> > > > >>>>>> than the flush + scan combined.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> ============================================================================================
> > > > >>>>>> Setup:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> ============================================================================================
> > > > >>>>>> Java 9 with default settings aside from a 512 MB heap
> (Xmx512m,
> > > > >> Xms512m)
> > > > >>>>>> CPU: i7 2.2 Ghz.
> > > > >>>>>>
> > > > >>>>>> Note: I am using a slightly-modified, directly-accessible
> Kafka
> > > > >> Streams
> > > > >>>>>> RocksDB
> > > > >>>>>> implementation (RocksDB.java, basically just avoiding the
> > > > >>>>>> ProcessorContext).
> > > > >>>>>> There are no modifications to the default RocksDB values
> > provided
> > > in
> > > > >> the
> > > > >>>>>> 2.1/trunk release.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> keysize = 128 bytes
> > > > >>>>>> valsize = 512 bytes
> > > > >>>>>>
> > > > >>>>>> Step 1:
> > > > >>>>>> Write X positive matching events: (key = prefix + left-padded
> > > > >>>>>> auto-incrementing integer)
> > > > >>>>>> Step 2:
> > > > >>>>>> Write 10X negative matching events (key = left-padded
> > > > >> auto-incrementing
> > > > >>>>>> integer)
> > > > >>>>>> Step 3:
> > > > >>>>>> Perform flush
> > > > >>>>>> Step 4:
> > > > >>>>>> Perform prefixScan
> > > > >>>>>> Step 5:
> > > > >>>>>> Iterate through return Iterator and validate the count of
> > expected
> > > > >>>>> events.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> ============================================================================================
> > > > >>>>>> Results:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> ============================================================================================
> > > > >>>>>> X = 1k (11k events total)
> > > > >>>>>> Flush Time = 39 mS
> > > > >>>>>> Scan Time = 7 mS
> > > > >>>>>> 6.9 MB disk
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > >>>>>> X = 10k (110k events total)
> > > > >>>>>> Flush Time = 45 mS
> > > > >>>>>> Scan Time = 8 mS
> > > > >>>>>> 127 MB
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > >>>>>> X = 100k (1.1M events total)
> > > > >>>>>> Test1:
> > > > >>>>>> Flush Time = 60 mS
> > > > >>>>>> Scan Time = 12 mS
> > > > >>>>>> 678 MB
> > > > >>>>>>
> > > > >>>>>> Test2:
> > > > >>>>>> Flush Time = 45 mS
> > > > >>>>>> Scan Time = 7 mS
> > > > >>>>>> 576 MB
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > >>>>>> X = 1MB (11M events total)
> > > > >>>>>> Test1:
> > > > >>>>>> Flush Time = 52 mS
> > > > >>>>>> Scan Time = 19 mS
> > > > >>>>>> 7.2 GB
> > > > >>>>>>
> > > > >>>>>> Test2:
> > > > >>>>>> Flush Time = 84 mS
> > > > >>>>>> Scan Time = 34 mS
> > > > >>>>>> 9.1 GB
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > >>>>>> X = 2.5M (27.5M events total)
> > > > >>>>>> Test1:
> > > > >>>>>> Flush Time = 82 mS
> > > > >>>>>> Scan Time = 63 mS
> > > > >>>>>> 17GB - 276 sst files
> > > > >>>>>>
> > > > >>>>>> Test2:
> > > > >>>>>> Flush Time = 116 mS
> > > > >>>>>> Scan Time = 35 mS
> > > > >>>>>> 23GB - 361 sst files
> > > > >>>>>>
> > > > >>>>>> Test3:
> > > > >>>>>> Flush Time = 103 mS
> > > > >>>>>> Scan Time = 82 mS
> > > > >>>>>> 19 GB - 300 sst files
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------
> > > > >>>>>>
> > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M events. I
> > tried
> > > > to
> > > > >> go
> > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the 100GB+
> > > range
> > > > >>>>> and my
> > > > >>>>>> laptop ran out of disk. More extensive testing could be done
> > but I
> > > > >>>>> suspect
> > > > >>>>>> that it would be in line with what we're seeing in the results
> > > > above.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> At this point in time, I think the only major discussion point
> > is
> > > > >> really
> > > > >>>>>> around what Jan and I have disagreed on: repartitioning back +
> > > > >> resolving
> > > > >>>>>> potential out of order issues or leaving that up to the client
> > to
> > > > >>>>> handle.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Thanks folks,
> > > > >>>>>>
> > > > >>>>>> Adam
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> > > > jan.filip...@trivago.com
> > > > >>>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> > > > >>>>>>>> Hi all,
> > > > >>>>>>>>
> > > > >>>>>>>> Sorry that this discussion petered out... I think the 2.1
> > > release
> > > > >>>>>>> caused an
> > > > >>>>>>>> extended distraction that pushed it off everyone's radar
> > (which
> > > > was
> > > > >>>>>>>> precisely Adam's concern). Personally, I've also had some
> > extend
> > > > >>>>>>>> distractions of my own that kept (and continue to keep) me
> > > > >>>>> preoccupied.
> > > > >>>>>>>>
> > > > >>>>>>>> However, calling for a vote did wake me up, so I guess Jan
> was
> > > on
> > > > >> the
> > > > >>>>>>> right
> > > > >>>>>>>> track!
> > > > >>>>>>>>
> > > > >>>>>>>> I've gone back and reviewed the whole KIP document and the
> > prior
> > > > >>>>>>>> discussion, and I'd like to offer a few thoughts:
> > > > >>>>>>>>
> > > > >>>>>>>> API Thoughts:
> > > > >>>>>>>>
> > > > >>>>>>>> 1. If I read the KIP right, you are proposing a many-to-one
> > > join.
> > > > >>>>> Could
> > > > >>>>>>> we
> > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip
> the
> > > > design
> > > > >>>>>>> around
> > > > >>>>>>>> and make it a oneToManyJoin?
> > > > >>>>>>>>
> > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the join
> type,
> > > and
> > > > it
> > > > >>>>>>> seems
> > > > >>>>>>>> like it might trick some people into using it for a
> one-to-one
> > > > join.
> > > > >>>>>>> This
> > > > >>>>>>>> would work, of course, but it would be super inefficient
> > > compared
> > > > to
> > > > >>>>> a
> > > > >>>>>>>> simple rekey-and-join.
> > > > >>>>>>>>
> > > > >>>>>>>> 2. I might have missed it, but I don't think it's specified
> > > > whether
> > > > >>>>>>> it's an
> > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer join, as
> > > > >>>>> (neglecting
> > > > >>>>>>> IQ),
> > > > >>>>>>>> the rest can be achieved by filtering or by handling it in
> the
> > > > >>>>>>> ValueJoiner.
> > > > >>>>>>>>
> > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite
> right.
> > > > >>>>>>>> 3a. Regarding Serialized: There are a few different
> paradigms
> > in
> > > > >>>>> play in
> > > > >>>>>>>> the Streams API, so it's confusing, but instead of three
> > > > Serialized
> > > > >>>>>>> args, I
> > > > >>>>>>>> think it would be better to have one that allows
> (optionally)
> > > > >> setting
> > > > >>>>>>> the 4
> > > > >>>>>>>> incoming serdes. The result serde is defined by the
> > > Materialized.
> > > > >> The
> > > > >>>>>>>> incoming serdes can be optional because they might already
> be
> > > > >>>>> available
> > > > >>>>>>> on
> > > > >>>>>>>> the source KTables, or the default serdes from the config
> > might
> > > be
> > > > >>>>>>>> applicable.
> > > > >>>>>>>>
> > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins
> don't
> > > > allow
> > > > >>>>>>> setting
> > > > >>>>>>>> one, and it seems like it might actually be harmful, since
> the
> > > > rekey
> > > > >>>>>>>> operation needs to produce results that are co-partitioned
> > with
> > > > the
> > > > >>>>>>> "other"
> > > > >>>>>>>> KTable.
> > > > >>>>>>>>
> > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I didn't
> > > actually
> > > > >>>>>>> follow
> > > > >>>>>>>> what Matthias meant about namespacing requiring
> > "deserializing"
> > > > the
> > > > >>>>>>> record
> > > > >>>>>>>> header. The headers are already Strings, so I don't think
> that
> > > > >>>>>>>> deserialization is required. If we applied the namespace at
> > > source
> > > > >>>>> nodes
> > > > >>>>>>>> and stripped it at sink nodes, this would be practically no
> > > > >> overhead.
> > > > >>>>>>> The
> > > > >>>>>>>> advantage of the namespace idea is that no public API change
> > wrt
> > > > >>>>> headers
> > > > >>>>>>>> needs to happen, and no restrictions need to be placed on
> > users'
> > > > >>>>>>> headers.
> > > > >>>>>>>>
> > > > >>>>>>>> (Although I'm wondering if we can get away without the
> header
> > at
> > > > >>>>> all...
> > > > >>>>>>>> stay tuned)
> > > > >>>>>>>>
> > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM table
> > > growing
> > > > >>>>>>> without
> > > > >>>>>>>> bound. As I read it, the HWM table is effectively
> implementing
> > > OCC
> > > > >> to
> > > > >>>>>>>> resolve the problem you noted with disordering when the
> rekey
> > is
> > > > >>>>>>>> reversed... particularly notable when the FK changes. As
> such,
> > > it
> > > > >>>>> only
> > > > >>>>>>>> needs to track the most recent "version" (the offset in the
> > > source
> > > > >>>>>>>> partition) of each key. Therefore, it should have the same
> > > number
> > > > of
> > > > >>>>>>> keys
> > > > >>>>>>>> as the source table at all times.
> > > > >>>>>>>>
> > > > >>>>>>>> I see that you are aware of KIP-258, which I think might be
> > > > relevant
> > > > >>>>> in
> > > > >>>>>>> a
> > > > >>>>>>>> couple of ways. One: it's just about storing the timestamp
> in
> > > the
> > > > >>>>> state
> > > > >>>>>>>> store, but the ultimate idea is to effectively use the
> > timestamp
> > > > as
> > > > >>>>> an
> > > > >>>>>>> OCC
> > > > >>>>>>>> "version" to drop disordered updates. You wouldn't want to
> use
> > > the
> > > > >>>>>>>> timestamp for this operation, but if you were to use a
> similar
> > > > >>>>>>> mechanism to
> > > > >>>>>>>> store the source offset in the store alongside the re-keyed
> > > > values,
> > > > >>>>> then
> > > > >>>>>>>> you could avoid a separate table.
> > > > >>>>>>>>
> > > > >>>>>>>> 6. You and Jan have been thinking about this for a long
> time,
> > so
> > > > >> I've
> > > > >>>>>>>> probably missed something here, but I'm wondering if we can
> > > avoid
> > > > >> the
> > > > >>>>>>> HWM
> > > > >>>>>>>> tracking at all and resolve out-of-order during a final join
> > > > >>>>> instead...
> > > > >>>>>>>>
> > > > >>>>>>>> Let's say we're joining a left table (Integer K: Letter FK,
> > > (other
> > > > >>>>>>> data))
> > > > >>>>>>>> to a right table (Letter K: (some data)).
> > > > >>>>>>>>
> > > > >>>>>>>> Left table:
> > > > >>>>>>>> 1: (A, xyz)
> > > > >>>>>>>> 2: (B, asd)
> > > > >>>>>>>>
> > > > >>>>>>>> Right table:
> > > > >>>>>>>> A: EntityA
> > > > >>>>>>>> B: EntityB
> > > > >>>>>>>>
> > > > >>>>>>>> We could do a rekey as you proposed with a combined key, but
> > not
> > > > >>>>>>>> propagating the value at all..
> > > > >>>>>>>> Rekey table:
> > > > >>>>>>>> A-1: (dummy value)
> > > > >>>>>>>> B-2: (dummy value)
> > > > >>>>>>>>
> > > > >>>>>>>> Which we then join with the right table to produce:
> > > > >>>>>>>> A-1: EntityA
> > > > >>>>>>>> B-2: EntityB
> > > > >>>>>>>>
> > > > >>>>>>>> Which gets rekeyed back:
> > > > >>>>>>>> 1: A, EntityA
> > > > >>>>>>>> 2: B, EntityB
> > > > >>>>>>>>
> > > > >>>>>>>> And finally we do the actual join:
> > > > >>>>>>>> Result table:
> > > > >>>>>>>> 1: ((A, xyz), EntityA)
> > > > >>>>>>>> 2: ((B, asd), EntityB)
> > > > >>>>>>>>
> > > > >>>>>>>> The thing is that in that last join, we have the opportunity
> > to
> > > > >>>>> compare
> > > > >>>>>>> the
> > > > >>>>>>>> current FK in the left table with the incoming PK of the
> right
> > > > >>>>> table. If
> > > > >>>>>>>> they don't match, we just drop the event, since it must be
> > > > outdated.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets
> > > updated
> > > > >> to
> > > > >>>>>>> (1:
> > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about whether the
> > final
> > > > >>>>> state
> > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the algorithm
> > > above,
> > > > >>>>> you
> > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B,
> xyz),
> > > (B,
> > > > >>>>>>>> EntityB)). It seems like this does give you enough
> information
> > > to
> > > > >>>>> make
> > > > >>>>>>> the
> > > > >>>>>>>> right choice, regardless of disordering.
> > > > >>>>>>>
> > > > >>>>>>> Will check Adams patch, but this should work. As mentioned
> > often
> > > I
> > > > am
> > > > >>>>>>> not convinced on partitioning back for the user
> automatically.
> > I
> > > > >> think
> > > > >>>>>>> this is the real performance eater ;)
> > > > >>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 7. Last thought... I'm a little concerned about the
> > performance
> > > of
> > > > >>>>> the
> > > > >>>>>>>> range scans when records change in the right table. You've
> > said
> > > > that
> > > > >>>>>>> you've
> > > > >>>>>>>> been using the algorithm you presented in production for a
> > > while.
> > > > >> Can
> > > > >>>>>>> you
> > > > >>>>>>>> give us a sense of the performance characteristics you've
> > > > observed?
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Make it work, make it fast, make it beautiful. The topmost
> > thing
> > > > here
> > > > >>>>> is
> > > > >>>>>>> / was correctness. In practice I do not measure the
> performance
> > > of
> > > > >> the
> > > > >>>>>>> range scan. Usual cases I run this with is emitting 500k -
> 1kk
> > > rows
> > > > >>>>>>> on a left hand side change. The range scan is just the work
> you
> > > > gotta
> > > > >>>>>>> do, also when you pack your data into different formats,
> > usually
> > > > the
> > > > >>>>>>> rocks performance is very tight to the size of the data and
> we
> > > > can't
> > > > >>>>>>> really change that. It is more important for users to prevent
> > > > useless
> > > > >>>>>>> updates to begin with. My left hand side is guarded to drop
> > > changes
> > > > >>>>> that
> > > > >>>>>>> are not going to change my join output.
> > > > >>>>>>>
> > > > >>>>>>> usually it's:
> > > > >>>>>>>
> > > > >>>>>>> drop unused fields and then don't forward if old.equals(new)
> > > > >>>>>>>
> > > > >>>>>>> regarding to the performance of creating an iterator for
> > smaller
> > > > >>>>>>> fanouts, users can still just do a group by first then
> anyways.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>> I could only think of one alternative, but I'm not sure if
> > it's
> > > > >>>>> better
> > > > >>>>>>> or
> > > > >>>>>>>> worse... If the first re-key only needs to preserve the
> > original
> > > > >> key,
> > > > >>>>>>> as I
> > > > >>>>>>>> proposed in #6, then we could store a vector of keys in the
> > > value:
> > > > >>>>>>>>
> > > > >>>>>>>> Left table:
> > > > >>>>>>>> 1: A,...
> > > > >>>>>>>> 2: B,...
> > > > >>>>>>>> 3: A,...
> > > > >>>>>>>>
> > > > >>>>>>>> Gets re-keyed:
> > > > >>>>>>>> A: [1, 3]
> > > > >>>>>>>> B: [2]
> > > > >>>>>>>>
> > > > >>>>>>>> Then, the rhs part of the join would only need a regular
> > > > single-key
> > > > >>>>>>> lookup.
> > > > >>>>>>>> Of course we have to deal with the problem of large values,
> as
> > > > >>>>> there's
> > > > >>>>>>> no
> > > > >>>>>>>> bound on the number of lhs records that can reference rhs
> > > records.
> > > > >>>>>>> Offhand,
> > > > >>>>>>>> I'd say we could page the values, so when one row is past
> the
> > > > >>>>>>> threshold, we
> > > > >>>>>>>> append the key for the next page. Then in most cases, it
> would
> > > be
> > > > a
> > > > >>>>>>> single
> > > > >>>>>>>> key lookup, but for large fan-out updates, it would be one
> per
> > > > (max
> > > > >>>>>>> value
> > > > >>>>>>>> size)/(avg lhs key size).
> > > > >>>>>>>>
> > > > >>>>>>>> This seems more complex, though... Plus, I think there's
> some
> > > > extra
> > > > >>>>>>>> tracking we'd need to do to know when to emit a retraction.
> > For
> > > > >>>>> example,
> > > > >>>>>>>> when record 1 is deleted, the re-key table would just have
> (A:
> > > > [3]).
> > > > >>>>>>> Some
> > > > >>>>>>>> kind of tombstone is needed so that the join result for 1
> can
> > > also
> > > > >> be
> > > > >>>>>>>> retracted.
> > > > >>>>>>>>
> > > > >>>>>>>> That's all!
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP.
> > > Sorry
> > > > >> the
> > > > >>>>>>>> discussion has been slow.
> > > > >>>>>>>> -John
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> > > > >>>>> jan.filip...@trivago.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Id say you can just call the vote.
> > > > >>>>>>>>>
> > > > >>>>>>>>> that happens all the time, and if something comes up, it
> just
> > > > goes
> > > > >>>>> back
> > > > >>>>>>>>> to discuss.
> > > > >>>>>>>>>
> > > > >>>>>>>>> would not expect to much attention with another another
> email
> > > in
> > > > >>>>> this
> > > > >>>>>>>>> thread.
> > > > >>>>>>>>>
> > > > >>>>>>>>> best Jan
> > > > >>>>>>>>>
> > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> > > > >>>>>>>>>> Hello Contributors
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do need to
> > bump
> > > > >>>>> this to
> > > > >>>>>>>>> keep
> > > > >>>>>>>>>> visibility up. I am still intending to push this through
> > once
> > > > >>>>>>> contributor
> > > > >>>>>>>>>> feedback is given.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Main points that need addressing:
> > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current
> singular
> > > > graph
> > > > >>>>> node
> > > > >>>>>>>>> into
> > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right
> now. I
> > > am
> > > > a
> > > > >>>>> bit
> > > > >>>>>>>>> fuzzy
> > > > >>>>>>>>>> on how the optimizations are supposed to work, so I would
> > > > >>>>> appreciate
> > > > >>>>>>> any
> > > > >>>>>>>>>> help on this aspect.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This thread
> has
> > > > much
> > > > >>>>>>>>> discourse
> > > > >>>>>>>>>> between Jan and I between the current highwater mark
> > proposal
> > > > and
> > > > >> a
> > > > >>>>>>>>> groupBy
> > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need to
> > > strictly
> > > > >>>>> handle
> > > > >>>>>>>>> any
> > > > >>>>>>>>>> chance of out-of-order data and leave none of it up to the
> > > > >>>>> consumer.
> > > > >>>>>>> Any
> > > > >>>>>>>>>> comments or suggestions here would also help.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 3) Anything else that you see that would prevent this from
> > > > moving
> > > > >>>>> to a
> > > > >>>>>>>>> vote?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Adam
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> > > > >>>>>>>>> adam.bellem...@gmail.com>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Jan
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and
> > > > >>>>> Stores.persistentWindowStore,
> > > > >>>>>>> you
> > > > >>>>>>>>>>> actually only need to specify the amount of segments you
> > want
> > > > and
> > > > >>>>> how
> > > > >>>>>>>>> large
> > > > >>>>>>>>>>> they are. To the best of my understanding, what happens
> is
> > > that
> > > > >>>>> the
> > > > >>>>>>>>>>> segments are automatically rolled over as new data with
> new
> > > > >>>>>>> timestamps
> > > > >>>>>>>>> are
> > > > >>>>>>>>>>> created. We use this exact functionality in some of the
> > work
> > > > done
> > > > >>>>>>>>>>> internally at my company. For reference, this is the
> > hopping
> > > > >>>>> windowed
> > > > >>>>>>>>> store.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> In the code that I have provided, there are going to be
> two
> > > 24h
> > > > >>>>>>>>> segments.
> > > > >>>>>>>>>>> When a record is put into the windowStore, it will be
> > > inserted
> > > > at
> > > > >>>>>>> time
> > > > >>>>>>>>> T in
> > > > >>>>>>>>>>> both segments. The two segments will always overlap by
> 12h.
> > > As
> > > > >>>>> time
> > > > >>>>>>>>> goes on
> > > > >>>>>>>>>>> and new records are added (say at time T+12h+), the
> oldest
> > > > >> segment
> > > > >>>>>>> will
> > > > >>>>>>>>> be
> > > > >>>>>>>>>>> automatically deleted and a new segment created. The
> > records
> > > > are
> > > > >>>>> by
> > > > >>>>>>>>> default
> > > > >>>>>>>>>>> inserted with the context.timestamp(), such that it is
> the
> > > > record
> > > > >>>>>>> time,
> > > > >>>>>>>>> not
> > > > >>>>>>>>>>> the clock time, which is used.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> To the best of my understanding, the timestamps are
> > retained
> > > > when
> > > > >>>>>>>>>>> restoring from the changelog.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a
> > > > >>>>>>> segment-level,
> > > > >>>>>>>>>>> instead of at an individual record level.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> > > > >>>>>>> jan.filip...@trivago.com>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Will that work? I expected it to blow up with
> > > > ClassCastException
> > > > >>>>> or
> > > > >>>>>>>>>>>> similar.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> You either would have to specify the window you
> fetch/put
> > or
> > > > >>>>> iterate
> > > > >>>>>>>>>>>> across all windows the key was found in right?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I just hope the window-store doesn't check stream-time
> > under
> > > > the
> > > > >>>>>>> hoods
> > > > >>>>>>>>>>>> that would be a questionable interface.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If it does: did you see my comment on checking all the
> > > windows
> > > > >>>>>>> earlier?
> > > > >>>>>>>>>>>> that would be needed to actually give reasonable time
> > > > gurantees.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> > > > >>>>>>>>>>>>> Hi Jan
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only changed
> the
> > > > state
> > > > >>>>>>> store,
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>> the ProcessorSupplier.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>> Adam
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> > > > >>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> @Guozhang
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed something
> > that
> > > > >>>>> will be
> > > > >>>>>>>>>>>>>>> extremely
> > > > >>>>>>>>>>>>>>> useful for this KIP.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> @Jan
> > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I will
> > not
> > > > be
> > > > >>>>>>> moving
> > > > >>>>>>>>>>>> ahead
> > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy
> solution
> > > as
> > > > >> you
> > > > >>>>>>>>>>>> propose.
> > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it yourself
> > off
> > > > of
> > > > >>>>> my
> > > > >>>>>>>>>>>> current PR
> > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I would
> be
> > > more
> > > > >>>>> than
> > > > >>>>>>>>>>>> happy to
> > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it stands
> > > right
> > > > >>>>> now,
> > > > >>>>>>> I do
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>> really have more time to invest into alternatives
> > without
> > > > >>>>> there
> > > > >>>>>>>>> being
> > > > >>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>> strong indication from the binding voters which they
> > > would
> > > > >>>>>>> prefer.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave up on
> > the
> > > > >>>>> streams
> > > > >>>>>>>>> DSL
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>> some time already, otherwise I would have pulled this
> > KIP
> > > > >>>>> through
> > > > >>>>>>>>>>>> already.
> > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on
> PAPI.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the windowed
> > state
> > > > >>>>> store
> > > > >>>>>>> in
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>>>>> next
> > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I will
> > come
> > > > >> back
> > > > >>>>>>> for
> > > > >>>>>>>>>>>> final
> > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of the
> > > > binding
> > > > >>>>>>> voters
> > > > >>>>>>>>>>>> could
> > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it
> > > > >>>>> according
> > > > >>>>>>> to
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>>>>> latest plan:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed
> store.
> > > > This
> > > > >>>>>>> could
> > > > >>>>>>>>> be
> > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are
> > > > >>>>> completed.
> > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Adam
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already
> > > > updated
> > > > >>>>> in
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> PR?
> > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing
> > > something?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
> > > > >>>>>>> wangg...@gmail.com>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the
> > wrong
> > > > >> link,
> > > > >>>>>>> as it
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of
> > > KIP-258
> > > > >>>>> we do
> > > > >>>>>>>>>>>> want to
> > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source KTable"
> > such
> > > > >> that
> > > > >>>>>>>>>>>> instead of
> > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized store,
> > > i.e.
> > > > >>>>>>> following
> > > > >>>>>>>>>>>>>>>> offset
> > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are older than
> > the
> > > > >>>>> current
> > > > >>>>>>>>>>>> key's
> > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Guozhang
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
> > > > >>>>>>>>> wangg...@gmail.com>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hello Adam,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final
> step
> > > > (i.e.
> > > > >>>>> the
> > > > >>>>>>>>> high
> > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced with a
> > > window
> > > > >>>>>>> store),
> > > > >>>>>>>>> I
> > > > >>>>>>>>>>>>>>>>> think
> > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually help:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value
> > store
> > > > >>>>> (i.e.
> > > > >>>>>>> only
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as
> > > > >>>>> described
> > > > >>>>>>> in
> > > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533,
> is
> > > > that
> > > > >>>>> we
> > > > >>>>>>> can
> > > > >>>>>>>>>>>> then
> > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its
> > > timestamp
> > > > is
> > > > >>>>>>>>> smaller
> > > > >>>>>>>>>>>> than
> > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I think
> it
> > > is
> > > > >>>>> very
> > > > >>>>>>>>>>>> similar to
> > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark based
> > > filtering,
> > > > >>>>> while
> > > > >>>>>>>>> you
> > > > >>>>>>>>>>>> only
> > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the
> joining
> > > > >> records
> > > > >>>>>>> are
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> correctly
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the final
> > stage.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence
> > > > >>>>>>> non-windowed
> > > > >>>>>>>>>>>> KTables
> > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not really
> have
> > a
> > > > good
> > > > >>>>>>>>> support
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>> their joins anyways (
> > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
> > > > >>>>>>>>>>>>>>>>> I
> > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed
> KTable-KTable
> > > > >>>>> non-key
> > > > >>>>>>>>> joins
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Guozhang
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
> > > > >>>>>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Hi Guozhang
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would grow
> > > > >> endlessly
> > > > >>>>>>> based
> > > > >>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of
> > (<this
> > > > >>>>> table
> > > > >>>>>>>>>>>> primary
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> key>,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used
> to
> > > > >>>>>>> differentiate
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> between
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal
> > would
> > > > be
> > > > >>>>> to
> > > > >>>>>>>>>>>> replace
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This
> would
> > > > allow
> > > > >>>>> the
> > > > >>>>>>>>> same
> > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This
> > > should
> > > > >>>>> allow
> > > > >>>>>>> for
> > > > >>>>>>>>>>>> all
> > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and should
> be
> > > > >>>>>>> customizable
> > > > >>>>>>>>>>>> by
> > > > >>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps
> just
> > > 10
> > > > >>>>>>> minutes
> > > > >>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> window,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...).
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the
> > trick
> > > > >> here.
> > > > >>>>>>> Even
> > > > >>>>>>>>>>>> if I
> > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic
> repartitioning
> > > > >>>>> optional
> > > > >>>>>>>>>>>> since I
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a
> > little
> > > > bit
> > > > >>>>>>>>>>>> sceptical
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> about
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially one
> could
> > > run
> > > > >>>>> into
> > > > >>>>>>>>>>>> problems
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> when
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window border. I
> will
> > > > check
> > > > >>>>> you
> > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we
> > could
> > > > >>>>> still
> > > > >>>>>>>>> check
> > > > >>>>>>>>>>>>>>>>>> _all_
> > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance
> impact I
> > > > >> guess.
> > > > >>>>>>> Will
> > > > >>>>>>>>>>>> let
> > > > >>>>>>>>>>>>>>>>>> you
> > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct as
> is. I
> > > > >>>>> wouldn't
> > > > >>>>>>> not
> > > > >>>>>>>>>>>> like
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> timestamp(A)  <
> > > > >>>>>>>>> timestamp(B).
> > > > >>>>>>>>>>>> I
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> think
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> we can't expect that.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> @Jan
> > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks
> > for
> > > > the
> > > > >>>>>>>>>>>> diagram, it
> > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do not
> have
> > > the
> > > > >>>>>>> original
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> primary
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was
> available
> > > > then
> > > > >>>>> you
> > > > >>>>>>>>>>>> would be
> > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map. That
> > > being
> > > > >>>>> said,
> > > > >>>>>>> I
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> encourage
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for
> clarity
> > > for
> > > > >>>>>>> everyone
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> else.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard
> > work.
> > > > But
> > > > >>>>> I
> > > > >>>>>>>>>>>> understand
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> original
> > > > >> primary
> > > > >>>>>>> key,
> > > > >>>>>>>>> We
> > > > >>>>>>>>>>>>>>>>>> have
> > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and
> > > > >> basically
> > > > >>>>>>> not
> > > > >>>>>>>>>>>> using
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> any
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that
> in
> > > > >>>>> original
> > > > >>>>>>> DSL
> > > > >>>>>>>>>>>> its
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on
> my
> > > end.
> > > > >>>>> Will
> > > > >>>>>>>>>>>> finish
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map
> stay
> > > > >> inside
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> State
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have
> > > > >>>>> propagated?
> > > > >>>>>>>>> Isn't
> > > > >>>>>>>>>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state
> > store?
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is
> > > gonna
> > > > >>>>>>> return
> > > > >>>>>>>>>>>> `null`
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But there is
> > > going
> > > > to
> > > > >>>>> be
> > > > >>>>>>> a
> > > > >>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store
> > > > directly
> > > > >>>>> for
> > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a
> > regular
> > > > >>>>> store,
> > > > >>>>>>>>>>>> satisfying
> > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join.
> The
> > > > >>>>> Windowed
> > > > >>>>>>>>>>>> store is
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull
> > operation
> > > > we
> > > > >>>>>>> would
> > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the
> > > > window
> > > > >>>>>>> store
> > > > >>>>>>>>>>>> also
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> have
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the values then.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group
> > by
> > > > >>>>> before
> > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i think
> > it
> > > > >> would
> > > > >>>>>>> help
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> users
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the
> > original
> > > > >>>>> primary
> > > > >>>>>>>>> key
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> issue I
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid foundation
> to
> > > > build
> > > > >>>>> on.
> > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very
> > > > >>>>>>> unfortunate. I
> > > > >>>>>>>>>>>> could
> > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do not
> > think
> > > > its
> > > > >>>>> a
> > > > >>>>>>> good
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> decision.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thanks
> > > > >>>>>>>>>>>>>>>>>>> Adam
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta
> Dumbre <
> > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
> > > > >>>>>>> dumbreprajakta...@gmail.com
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           please remove me from this group
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM Jan
> > > Filipiak
> > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > >>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           wrote:
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > Hi Adam,
> > > > >>>>>>>>>>>>>>>>>>>           >
> > > > >>>>>>>>>>>>>>>>>>>           > give me some time, will make such a
> > > chart.
> > > > >> last
> > > > >>>>>>> time i
> > > > >>>>>>>>>>>> didn't
> > > > >>>>>>>>>>>>>>>>>>>           get along
> > > > >>>>>>>>>>>>>>>>>>>           > well with giphy and ruined all your
> > > charts.
> > > > >>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done today
> > > > >>>>>>>>>>>>>>>>>>>           >
> > > > >>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam Bellemare
> > > wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > > Hi Jan
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > > I have included a diagram of what I
> > > > >> attempted
> > > > >>>>> on
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> KIP.
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           >
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> > > > >>>>>>>>>>>>>>>>>>>           <
> > > > >>>>>>>>>>>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > > I attempted this back at the start
> of
> > > my
> > > > own
> > > > >>>>>>>>>>>> implementation
> > > > >>>>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>           this
> > > > >>>>>>>>>>>>>>>>>>>           > > solution, and since I could not get
> > it
> > > to
> > > > >>>>> work I
> > > > >>>>>>> have
> > > > >>>>>>>>>>>> since
> > > > >>>>>>>>>>>>>>>>>>>           discarded the
> > > > >>>>>>>>>>>>>>>>>>>           > > code. At this point in time, if you
> > > wish
> > > > to
> > > > >>>>>>> continue
> > > > >>>>>>>>>>>> pursuing
> > > > >>>>>>>>>>>>>>>>>>>           for your
> > > > >>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask that you
> > please
> > > > >>>>> create a
> > > > >>>>>>>>>>>> diagram on
> > > > >>>>>>>>>>>>>>>>>>>           the KIP
> > > > >>>>>>>>>>>>>>>>>>>           > > carefully explaining your solution.
> > > > Please
> > > > >>>>> feel
> > > > >>>>>>> free
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>> use
> > > > >>>>>>>>>>>>>>>>>>>           the image I
> > > > >>>>>>>>>>>>>>>>>>>           > > just posted as a starting point. I
> am
> > > > having
> > > > >>>>>>> trouble
> > > > >>>>>>>>>>>>>>>>>>>           understanding your
> > > > >>>>>>>>>>>>>>>>>>>           > > explanations but I think that a
> > > carefully
> > > > >>>>>>> constructed
> > > > >>>>>>>>>>>> diagram
> > > > >>>>>>>>>>>>>>>>>>>           will clear
> > > > >>>>>>>>>>>>>>>>>>>           > up
> > > > >>>>>>>>>>>>>>>>>>>           > > any misunderstandings. Alternately,
> > > > please
> > > > >>>>> post a
> > > > >>>>>>>>>>>>>>>>>>>           comprehensive PR with
> > > > >>>>>>>>>>>>>>>>>>>           > > your solution. I can only guess at
> > what
> > > > you
> > > > >>>>>>> mean, and
> > > > >>>>>>>>>>>> since I
> > > > >>>>>>>>>>>>>>>>>>>           value my
> > > > >>>>>>>>>>>>>>>>>>>           > own
> > > > >>>>>>>>>>>>>>>>>>>           > > time as much as you value yours, I
> > > > believe
> > > > >> it
> > > > >>>>> is
> > > > >>>>>>> your
> > > > >>>>>>>>>>>>>>>>>>>           responsibility to
> > > > >>>>>>>>>>>>>>>>>>>           > > provide an implementation instead
> of
> > me
> > > > >>>>> trying to
> > > > >>>>>>>>> guess.
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > > Adam
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00 AM, Jan
> > > > Filipiak
> > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > >>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > > wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >
> > > > >>>>>>>>>>>>>>>>>>>           > >> Hi James,
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing interested.
> > > kafka
> > > > >>>>>>> streams at
> > > > >>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>>>           point supports
> > > > >>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as long as both
> > > > streams
> > > > >>>>> have
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> same
> > > > >>>>>>>>>>>>>>>>>>> key.
> > > > >>>>>>>>>>>>>>>>>>>           > >> Adam is currently implementing a
> > join
> > > > >> where a
> > > > >>>>>>> KTable
> > > > >>>>>>>>>>>> and a
> > > > >>>>>>>>>>>>>>>>>>>           KTable can
> > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > >>>>>>>>>>>>>>>>>>>           > >> a one to many relation ship (1:n).
> > We
> > > > >> exploit
> > > > >>>>>>> that
> > > > >>>>>>>>>>>> rocksdb
> > > > >>>>>>>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > >> datastore that keeps data sorted (At
> > > least
> > > > >>>>>>> exposes an
> > > > >>>>>>>>>>>> API to
> > > > >>>>>>>>>>>>>>>>>>>           access the
> > > > >>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted fashion).
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >> I think the technical caveats are
> > well
> > > > >>>>>>> understood
> > > > >>>>>>>>> now
> > > > >>>>>>>>>>>> and we
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > basically
> > > > >>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API Design
> (
> > > when
> > > > >> Adam
> > > > >>>>>>> sees
> > > > >>>>>>>>> my
> > > > >>>>>>>>>>>> newest
> > > > >>>>>>>>>>>>>>>>>>>           message).
> > > > >>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track record of
> > > loosing
> > > > >>>>> those
> > > > >>>>>>> kinda
> > > > >>>>>>>>>>>>>>>>>>>           arguments within
> > > > >>>>>>>>>>>>>>>>>>>           > the
> > > > >>>>>>>>>>>>>>>>>>>           > >> streams community and I have no
> clue
> > > > why.
> > > > >> So
> > > > >>>>> I
> > > > >>>>>>>>>>>> literally
> > > > >>>>>>>>>>>>>>>>>>>           can't wait for
> > > > >>>>>>>>>>>>>>>>>>>           > you
> > > > >>>>>>>>>>>>>>>>>>>           > >> to churn through this thread and
> > give
> > > > you
> > > > >>>>>>> opinion on
> > > > >>>>>>>>>>>> how we
> > > > >>>>>>>>>>>>>>>>>>>           should
> > > > >>>>>>>>>>>>>>>>>>>           > design
> > > > >>>>>>>>>>>>>>>>>>>           > >> the return type of the
> oneToManyJoin
> > > and
> > > > >> how
> > > > >>>>>>> many
> > > > >>>>>>>>>>>> power we
> > > > >>>>>>>>>>>>>>>>>>>           want to give
> > > > >>>>>>>>>>>>>>>>>>>           > to
> > > > >>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity" (where
> > > > simplicity
> > > > >>>>> isn't
> > > > >>>>>>>>>>>> really that
> > > > >>>>>>>>>>>>>>>>>>>           as users
> > > > >>>>>>>>>>>>>>>>>>>           > still
> > > > >>>>>>>>>>>>>>>>>>>           > >> need to understand it I argue)
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >> waiting for you to join in on the
> > > > >> discussion
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >> Best Jan
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James Kwan
> > wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>
> > > > >>>>>>>>>>>>>>>>>>>           > >>> I am new to this group and I
> found
> > > this
> > > > >>>>> subject
> > > > >>>>>>>>>>>>>>>>>>>           interesting.  Sounds
> > > > >>>>>>>>>>>>>>>>>>>           > like
> > > > >>>>>>>>>>>>>>>>>>>           > >>> you guys want to implement a join
> > > > table of
> > > > >>>>> two
> > > > >>>>>>>>>>>> streams? Is
> > > > >>>>>>>>>>>>>>>>>>> there
> > > > >>>>>>>>>>>>>>>>>>>           > somewhere
> > > > >>>>>>>>>>>>>>>>>>>           > >>> I can see the original
> requirement
> > or
> > > > >>>>> proposal?
> > > > >>>>>>>>>>>>>>>>>>>           > >>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13 AM, Jan
> > > > Filipiak
> > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
> > > > >>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17, Adam
> > Bellemare
> > > > >> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing using a
> > > > Windowed
> > > > >>>>> Store
> > > > >>>>>>> to
> > > > >>>>>>>>>>>> store the
> > > > >>>>>>>>>>>>>>>>>>>           highwater
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> mark.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this should
> > work
> > > > >> fine,
> > > > >>>>>>> with
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> caveat
> > > > >>>>>>>>>>>>>>>>>>>           being that
> > > > >>>>>>>>>>>>>>>>>>>           > it
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> can
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> only resolve out-of-order
> arrival
> > > > for up
> > > > >>>>> to
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> size of
> > > > >>>>>>>>>>>>>>>>>>>           the window
> > > > >>>>>>>>>>>>>>>>>>>           > (ie:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This would
> remove
> > > the
> > > > >>>>>>> possibility
> > > > >>>>>>>>>>>> of it
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> being
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > unbounded
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> in
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> size.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's
> > suggestion, I
> > > > >>>>> believe
> > > > >>>>>>> this
> > > > >>>>>>>>> is
> > > > >>>>>>>>>>>> where
> > > > >>>>>>>>>>>>>>>>>>>           we will
> > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement. While I
> > do
> > > > not
> > > > >>>>>>> disagree
> > > > >>>>>>>>>>>> with your
> > > > >>>>>>>>>>>>>>>>>>>           statement
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> about
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be additional
> > joins
> > > > done
> > > > >>>>> in a
> > > > >>>>>>>>>>>> real-world
> > > > >>>>>>>>>>>>>>>>>>>           workflow, I
> > > > >>>>>>>>>>>>>>>>>>>           > do
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> not
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> see how you can conclusively
> deal
> > > > with
> > > > >>>>>>>>> out-of-order
> > > > >>>>>>>>>>>>>>>>>>> arrival
> > > > >>>>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent joins. I
> > > have
> > > > >>>>>>> attempted
> > > > >>>>>>>>> what
> > > > >>>>>>>>>>>> I
> > > > >>>>>>>>>>>>>>>>>>>           think you have
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a high-water,
> > > using
> > > > >>>>>>> groupBy and
> > > > >>>>>>>>>>>> reduce)
> > > > >>>>>>>>>>>>>>>>>>>           and found
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> that if
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes too
> > > quickly,
> > > > or
> > > > >>>>> the
> > > > >>>>>>> load
> > > > >>>>>>>>> on
> > > > >>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>           stream thread
> > > > >>>>>>>>>>>>>>>>>>>           > is
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> too
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined messages will
> > > arrive
> > > > >>>>>>>>> out-of-order
> > > > >>>>>>>>>>>> and be
> > > > >>>>>>>>>>>>>>>>>>>           incorrectly
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that an
> > > intermediate
> > > > >>>>> event
> > > > >>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>> represented
> > > > >>>>>>>>>>>>>>>>>>>           as the
> > > > >>>>>>>>>>>>>>>>>>>           > final
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> event.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light on your
> > > > groupBy
> > > > >>>>>>>>>>>> implementation.
> > > > >>>>>>>>>>>>>>>>>>>           There must be
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion where it is,
> I
> > > > would
> > > > >>>>> just
> > > > >>>>>>> like
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           confirm. The idea
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it must be
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess up. I
> would
> > > > like
> > > > >> to
> > > > >>>>>>> clarify
> > > > >>>>>>>>>>>> before
> > > > >>>>>>>>>>>>>>>>>>>           we draw a
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the scattered
> > > events
> > > > >>>>> back to
> > > > >>>>>>>>> their
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> original
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > >>>>> partitions is the only way I know
> > how
> > > > to
> > > > >>>>>>>>> conclusively
> > > > >>>>>>>>>>>> deal
> > > > >>>>>>>>>>>>>>>>>>>           with
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in a given
> > time
> > > > >> frame,
> > > > >>>>>>> and to
> > > > >>>>>>>>>>>> ensure
> > > > >>>>>>>>>>>>>>>>>>>           that the
> > > > >>>>>>>>>>>>>>>>>>>           > data
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> is
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent with the
> > > input
> > > > >>>>> events.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code to share
> > that
> > > > >>>>>>> illustrates
> > > > >>>>>>>>> your
> > > > >>>>>>>>>>>>>>>>>>>           approach, I
> > > > >>>>>>>>>>>>>>>>>>>           > would
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> be
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it would
> remove
> > > any
> > > > >>>>>>>>>>>> misunderstandings
> > > > >>>>>>>>>>>>>>>>>>>           that I may
> > > > >>>>>>>>>>>>>>>>>>>           > have.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking for my
> > > code.
> > > > I
> > > > >>>>> don't
> > > > >>>>>>> have
> > > > >>>>>>>>>>>>>>>>>>>           something easily
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> readable here as its bloated
> with
> > > > >>>>> OO-patterns.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> @Override
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K aggKey, V
> > > > value, T
> > > > >>>>>>>>> aggregate)
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>      {
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V>
> > > currentStateAsMap =
> > > > >>>>>>>>>>>> asMap(aggregate);
> > > > >>>>>>>>>>>>>>>>>>> <<
> > > > >>>>>>>>>>>>>>>>>>>           imaginary
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
> > > > >>>>> mapper.apply(value);
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << this is the
> place
> > > > where
> > > > >>>>> people
> > > > >>>>>>>>>>>> actually
> > > > >>>>>>>>>>>>>>>>>>>           gonna have
> > > > >>>>>>>>>>>>>>>>>>>           > issues
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> and why you probably couldn't do
> > it.
> > > > we
> > > > >>>>> would
> > > > >>>>>>> need
> > > > >>>>>>>>>>>> to find
> > > > >>>>>>>>>>>>>>>>>>>           a solution
> > > > >>>>>>>>>>>>>>>>>>>           > here.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that yet.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << we propagate the
> > > > field in
> > > > >>>>> the
> > > > >>>>>>>>>>>> joiner, so
> > > > >>>>>>>>>>>>>>>>>>>           that we can
> > > > >>>>>>>>>>>>>>>>>>>           > pick
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate. Probably
> > you
> > > > have
> > > > >>>>> not
> > > > >>>>>>>>> thought
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>           this in your
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> approach right?
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I am very open
> to
> > > > find a
> > > > >>>>>>> generic
> > > > >>>>>>>>>>>> solution
> > > > >>>>>>>>>>>>>>>>>>>           here. In my
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is broken in
> > > > >>>>>>>>> KTableImpl.GroupBy
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>>>>           looses
> > > > >>>>>>>>>>>>>>>>>>>           > the keys
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the aggregate
> > > key.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I abstracted it
> > away
> > > > back
> > > > >>>>>>> then way
> > > > >>>>>>>>>>>> before
> > > > >>>>>>>>>>>>>>>>>>> i
> > > > >>>>>>>>>>>>>>>>>>> was
> > > > >>>>>>>>>>>>>>>>>>>           > thinking
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That is why I
> > > > didn't
> > > > >>>>>>> realize
> > > > >>>>>>>>> its
> > > > >>>>>>>>>>>>>>>>>>>           significance here.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << Opinions?
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          for (V m : current)
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > currentStateAsMap.put(mapper.apply(m),
> > > > >> m);
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> currentStateAsMap.put(toModifyKey,
> > > > >> value);
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          else
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > currentStateAsMap.remove(toModifyKey);
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> if(currentStateAsMap.isEmpty()){
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>                  return null;
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>              }
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>          retrun
> > > > >>>>>>> asAggregateType(currentStateAsMap)
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>      }
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Adam
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at 3:35 PM,
> > Jan
> > > > >>>>> Filipiak
> > > > >>>>>>> <
> > > > >>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com <mailto:
> > > > >>>>>>>>> jan.filip...@trivago.com
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for bringing
> Matthias
> > > to
> > > > >>>>> speed!
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> about the differences. I think
> > > > >> re-keying
> > > > >>>>>>> back
> > > > >>>>>>>>>>>> should be
> > > > >>>>>>>>>>>>>>>>>>>           optional at
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> best.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return a
> > > > KScatteredTable
> > > > >>>>> with
> > > > >>>>>>>>>>>> reshuffle()
> > > > >>>>>>>>>>>>>>>>>>>           returning
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> KTable<originalKey,Joined> to
> > make
> > > > the
> > > > >>>>>>> backwards
> > > > >>>>>>>>>>>>>>>>>>>           repartitioning
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big favour of
> > doing
> > > > the
> > > > >>>>> out
> > > > >>>>>>> of
> > > > >>>>>>>>> order
> > > > >>>>>>>>>>>>>>>>>>>           processing using
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> group
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water mark
> > > tracking.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded growth
> is
> > > > just
> > > > >>>>> scary
> > > > >>>>>>> + It
> > > > >>>>>>>>>>>> saves
> > > > >>>>>>>>>>>>>>>>>>> us
> > > > >>>>>>>>>>>>>>>>>>>           the header
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I think the abstraction of
> > always
> > > > >>>>>>> repartitioning
> > > > >>>>>>>>>>>> back is
> > > > >>>>>>>>>>>>>>>>>>>           just not so
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work has been
> > > done
> > > > >>>>> before
> > > > >>>>>>> we
> > > > >>>>>>>>>>>> partition
> > > > >>>>>>>>>>>>>>>>>>>           back and
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by something else afterwards
> is
> > > > really
> > > > >>>>>>> common.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49, Adam
> > > Bellemare
> > > > >>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your feedback,
> I
> > do
> > > > >>>>>>> appreciate
> > > > >>>>>>>>> it!
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing would be
> > > > possible,
> > > > >> it
> > > > >>>>>>> would
> > > > >>>>>>>>>>>> require
> > > > >>>>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           > deserialize
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies a
> > > > runtime
> > > > >>>>>>> overhead.
> > > > >>>>>>>>> I
> > > > >>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>           suggest to
> > > > >>>>>>>>>>>>>>>>>>>           > no
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid
> the
> > > > >>>>> overhead.
> > > > >>>>>>> If
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> becomes a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > problem in
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still add
> > > name
> > > > >>>>> spacing
> > > > >>>>>>>>> later
> > > > >>>>>>>>>>>> on.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go with
> using a
> > > > >> reserved
> > > > >>>>>>> string
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>           document it.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about the
> > design
> > > it
> > > > >> the
> > > > >>>>>>> type of
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> > > > >>>>>>>>>>>>>>>>>>>           > If
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> I
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the proposal
> > > correctly,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you have
> > table1
> > > > and
> > > > >>>>> table2
> > > > >>>>>>>>>>>> swapped.
> > > > >>>>>>>>>>>>>>>>>>>           Here is how it
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> works
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the records
> that
> > > > contain
> > > > >>>>> the
> > > > >>>>>>>>>>>> foreign key
> > > > >>>>>>>>>>>>>>>>>>>           within their
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream:
> > > > <a,(fk=A,bar=1)>,
> > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream: <A,X>,
> > <B,Y>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is required
> > to
> > > > >> extract
> > > > >>>>>>> the
> > > > >>>>>>>>>>>> foreign
> > > > >>>>>>>>>>>>>>>>>>> key.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key mapper: (
> > > value
> > > > =>
> > > > >>>>>>> value.fk
> > > > >>>>>>>>>>>>>>>>>>>           <http://value.fk> )
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is applied to each
> > > > element
> > > > >> in
> > > > >>>>>>>>> table1,
> > > > >>>>>>>>>>>> and a
> > > > >>>>>>>>>>>>>>>>>>>           new combined
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a,
> > > (fk=A,bar=1)>,
> > > > >>>>> <A-b,
> > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> > > > >>>>>>>>>>>>>>>>>>>           <B-c,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events are
> > > > >> copartitioned
> > > > >>>>>>> with
> > > > >>>>>>>>>>>> table2:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with
> Partition
> > > 0:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <A-a,
> > > > >>>>> (fk=A,bar=1)>,
> > > > >>>>>>> <A-b,
> > > > >>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with
> Partition
> > > 1:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <B-c,
> > > > >> (fk=B,bar=3)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they can be
> > joined
> > > > >>>>> together
> > > > >>>>>>>>> locally
> > > > >>>>>>>>>>>> by
> > > > >>>>>>>>>>>>>>>>>>>           applying the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's design
> and
> > > my
> > > > >>>>> design
> > > > >>>>>>>>>>>> deviate. My
> > > > >>>>>>>>>>>>>>>>>>>           design goes
> > > > >>>>>>>>>>>>>>>>>>>           > on
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data
> post-join
> > > and
> > > > >>>>> resolve
> > > > >>>>>>>>>>>> out-of-order
> > > > >>>>>>>>>>>>>>>>>>>           arrival of
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the data
> > keyed
> > > > just
> > > > >>>>> the
> > > > >>>>>>>>>>>> original key.
> > > > >>>>>>>>>>>>>>>>>>>           I do not
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of the
> > > internals
> > > > >>>>>>> outside of
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           joinOnForeignKey
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does make for
> > > larger
> > > > >>>>>>> footprint,
> > > > >>>>>>>>>>>> but it
> > > > >>>>>>>>>>>>>>>>>>>           removes all
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving out-of-order
> > > arrivals
> > > > >> and
> > > > >>>>>>>>> handling
> > > > >>>>>>>>>>>>>>>>>>>           CombinedKeys from
> > > > >>>>>>>>>>>>>>>>>>>           > the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that this
> makes
> > > the
> > > > >>>>>>> function
> > > > >>>>>>>>> much
> > > > >>>>>>>>>>>>>>>>>>> easier
> > > > >>>>>>>>>>>>>>>>>>>           to use.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this helps
> > resolve
> > > > your
> > > > >>>>>>>>> questions,
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>           please feel
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on your
> mind.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018 at 8:36
> PM,
> > > > >>>>> Matthias J.
> > > > >>>>>>>>> Sax <
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> matth...@confluent.io
> <mailto:
> > > > >>>>>>>>>>>> matth...@confluent.io>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching up on
> this
> > > > >> thread. I
> > > > >>>>>>> did
> > > > >>>>>>>>> not
> > > > >>>>>>>>>>>> read
> > > > >>>>>>>>>>>>>>>>>>>           everything so
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share couple of
> > > > initial
> > > > >>>>>>> thoughts:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think there is a
> > > > >> fundamental
> > > > >>>>>>>>>>>> difference
> > > > >>>>>>>>>>>>>>>>>>>           between header
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and KP-258. For
> > 258,
> > > > we
> > > > >> add
> > > > >>>>>>>>> headers
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka Streams
> and
> > > > nobody
> > > > >>>>>>> else is
> > > > >>>>>>>>>>>> supposed
> > > > >>>>>>>>>>>>>>>>>>>           to write
> > > > >>>>>>>>>>>>>>>>>>>           > into
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no user
> header
> > > are
> > > > >>>>> written
> > > > >>>>>>> into
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not
> conflicts.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I don't see a
> > big
> > > > issue
> > > > >>>>> with
> > > > >>>>>>>>> using
> > > > >>>>>>>>>>>>>>>>>>>           headers within
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we document it,
> we
> > > can
> > > > >> have
> > > > >>>>>>> some
> > > > >>>>>>>>>>>> "reserved"
> > > > >>>>>>>>>>>>>>>>>>>           header keys
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not allowed to use
> > > when
> > > > >>>>>>> processing
> > > > >>>>>>>>>>>> data with
> > > > >>>>>>>>>>>>>>>>>>>           Kafka
> > > > >>>>>>>>>>>>>>>>>>>           > Streams.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be ok.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a safe way
> to
> > > > avoid
> > > > >>>>>>>>> conflicts,
> > > > >>>>>>>>>>>> since
> > > > >>>>>>>>>>>>>>>>>>> these
> > > > >>>>>>>>>>>>>>>>>>>           > headers
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in internal
> > topics
> > > (I
> > > > >>>>> think):
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and changelog
> > > > topics,
> > > > >> we
> > > > >>>>> can
> > > > >>>>>>>>>>>> namespace
> > > > >>>>>>>>>>>>>>>>>>>           all headers:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined headers are
> > > > >> namespaced
> > > > >>>>> as
> > > > >>>>>>>>>>>> "external."
> > > > >>>>>>>>>>>>>>>>>>> +
> > > > >>>>>>>>>>>>>>>>>>>           headerKey
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers are
> > > > namespaced as
> > > > >>>>>>>>>>>> "internal." +
> > > > >>>>>>>>>>>>>>>>>>>           headerKey
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing would be
> > > > >> possible,
> > > > >>>>> it
> > > > >>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>> require
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies a
> > > > runtime
> > > > >>>>>>> overhead.
> > > > >>>>>>>>> I
> > > > >>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>           suggest to
> > > > >>>>>>>>>>>>>>>>>>>           > no
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid
> the
> > > > >>>>> overhead.
> > > > >>>>>>> If
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> becomes a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > problem in
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still add
> > > name
> > > > >>>>> spacing
> > > > >>>>>>>>> later
> > > > >>>>>>>>>>>> on.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern about the
> > design
> > > > it
> > > > >> the
> > > > >>>>>>> type
> > > > >>>>>>>>> of
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the proposal
> > > correctly,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1> table1 = ...
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2> table2 = ...
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3> joinedTable =
> > > > >>>>>>>>>>>> table1.join(table2,...);
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the
> `joinedTable`
> > > has
> > > > >> the
> > > > >>>>>>> same
> > > > >>>>>>>>> key
> > > > >>>>>>>>>>>> as the
> > > > >>>>>>>>>>>>>>>>>>>           left input
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not work
> > because
> > > > if
> > > > >>>>> table2
> > > > >>>>>>>>>>>> contains
> > > > >>>>>>>>>>>>>>>>>>>           multiple rows
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record in table1
> > > > (what is
> > > > >>>>> the
> > > > >>>>>>> main
> > > > >>>>>>>>>>>> purpose
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>           > foreign
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result table
> would
> > > only
> > > > >>>>>>> contain a
> > > > >>>>>>>>>>>> single
> > > > >>>>>>>>>>>>>>>>>>>           join result,
> > > > >>>>>>>>>>>>>>>>>>>           > but
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream: <A,X>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream:
> > <a,(A,1)>,
> > > > >>>>> <b,(A,2)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value a
> foreign
> > > key
> > > > to
> > > > >>>>>>> table1
> > > > >>>>>>>>> key
> > > > >>>>>>>>>>>> (ie,
> > > > >>>>>>>>>>>>>>>>>>>           "A" joins).
> > > > >>>>>>>>>>>>>>>>>>>           > If
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the same key
> as
> > > key
> > > > of
> > > > >>>>>>> table1,
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>>>           implies that the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be <A,
> > > > join(X,1)>
> > > > >> or
> > > > >>>>> <A,
> > > > >>>>>>>>>>>> join(X,2)>
> > > > >>>>>>>>>>>>>>>>>>>           but not
> > > > >>>>>>>>>>>>>>>>>>>           > both.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share the same
> > key,
> > > > >>>>> whatever
> > > > >>>>>>>>> result
> > > > >>>>>>>>>>>> record
> > > > >>>>>>>>>>>>>>>>>>>           we emit
> > > > >>>>>>>>>>>>>>>>>>>           > later,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the previous
> result.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason why Jan
> > > > originally
> > > > >>>>>>> proposed
> > > > >>>>>>>>>>>> to use
> > > > >>>>>>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>           > combination
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of the
> input
> > > > tables
> > > > >>>>> as
> > > > >>>>>>> key
> > > > >>>>>>>>> of
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           output table.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of the output
> > > table
> > > > >>>>> unique
> > > > >>>>>>> and
> > > > >>>>>>>>> we
> > > > >>>>>>>>>>>> can
> > > > >>>>>>>>>>>>>>>>>>>           store both in
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be <A-a,
> > > join(X,1)>,
> > > > >> <A-b,
> > > > >>>>>>>>>>>> join(X,2)>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM, Jan
> > Filipiak
> > > > >> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark could be
> > > > >>>>> disregarded.
> > > > >>>>>>> The
> > > > >>>>>>>>>>>> decision
> > > > >>>>>>>>>>>>>>>>>>>           about the
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the size of the
> > > > >> aggregated
> > > > >>>>>>> map.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long maps
> > would
> > > be
> > > > >>>>>>> unpacked
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>           forwarded. 0
> > > > >>>>>>>>>>>>>>>>>>>           > element
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published as
> delete.
> > > Any
> > > > >>>>> other
> > > > >>>>>>> count
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is in
> "waiting
> > > for
> > > > >>>>> correct
> > > > >>>>>>>>>>>> deletes to
> > > > >>>>>>>>>>>>>>>>>>>           > arrive"-state.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018 21:29, Adam
> > > > Bellemare
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I could
> > > replace
> > > > >> the
> > > > >>>>>>> second
> > > > >>>>>>>>>>>>>>>>>>>           repartition store
> > > > >>>>>>>>>>>>>>>>>>>           > and
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store with a
> > groupBy
> > > > and
> > > > >>>>>>> reduce.
> > > > >>>>>>>>>>>> However,
> > > > >>>>>>>>>>>>>>>>>>>           it looks
> > > > >>>>>>>>>>>>>>>>>>>           > like
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to store the
> > > > highwater
> > > > >>>>> value
> > > > >>>>>>>>> within
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           materialized
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the arrival of
> > > > out-of-order
> > > > >>>>>>> records
> > > > >>>>>>>>>>>> (assuming
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> my
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is correct...). This
> in
> > > > effect
> > > > >> is
> > > > >>>>>>> the
> > > > >>>>>>>>> same
> > > > >>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>           design I
> > > > >>>>>>>>>>>>>>>>>>>           > have
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two tables
> > merged
> > > > >>>>> together.
> > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           >
> > > > >>>>>>>>>>>>>>>>>>>           >
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to