Id say you can just call the vote.
that happens all the time, and if something comes up, it just goes back
to discuss.
would not expect to much attention with another another email in this
thread.
best Jan
On 09.10.2018 13:56, Adam Bellemare wrote:
Hello Contributors
I know that 2.1 is about to be released, but I do need to bump this to keep
visibility up. I am still intending to push this through once contributor
feedback is given.
Main points that need addressing:
1) Any way (or benefit) in structuring the current singular graph node into
multiple nodes? It has a whopping 25 parameters right now. I am a bit fuzzy
on how the optimizations are supposed to work, so I would appreciate any
help on this aspect.
2) Overall strategy for joining + resolving. This thread has much discourse
between Jan and I between the current highwater mark proposal and a groupBy
+ reduce proposal. I am of the opinion that we need to strictly handle any
chance of out-of-order data and leave none of it up to the consumer. Any
comments or suggestions here would also help.
3) Anything else that you see that would prevent this from moving to a vote?
Thanks
Adam
On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:
Hi Jan
With the Stores.windowStoreBuilder and Stores.persistentWindowStore, you
actually only need to specify the amount of segments you want and how large
they are. To the best of my understanding, what happens is that the
segments are automatically rolled over as new data with new timestamps are
created. We use this exact functionality in some of the work done
internally at my company. For reference, this is the hopping windowed store.
https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
In the code that I have provided, there are going to be two 24h segments.
When a record is put into the windowStore, it will be inserted at time T in
both segments. The two segments will always overlap by 12h. As time goes on
and new records are added (say at time T+12h+), the oldest segment will be
automatically deleted and a new segment created. The records are by default
inserted with the context.timestamp(), such that it is the record time, not
the clock time, which is used.
To the best of my understanding, the timestamps are retained when
restoring from the changelog.
Basically, this is heavy-handed way to deal with TTL at a segment-level,
instead of at an individual record level.
On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:
Will that work? I expected it to blow up with ClassCastException or
similar.
You either would have to specify the window you fetch/put or iterate
across all windows the key was found in right?
I just hope the window-store doesn't check stream-time under the hoods
that would be a questionable interface.
If it does: did you see my comment on checking all the windows earlier?
that would be needed to actually give reasonable time gurantees.
Best
On 25.09.2018 13:18, Adam Bellemare wrote:
Hi Jan
Check for " highwaterMat " in the PR. I only changed the state store,
not
the ProcessorSupplier.
Thanks,
Adam
On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:
On 24.09.2018 16:26, Adam Bellemare wrote:
@Guozhang
Thanks for the information. This is indeed something that will be
extremely
useful for this KIP.
@Jan
Thanks for your explanations. That being said, I will not be moving
ahead
with an implementation using reshuffle/groupBy solution as you
propose.
That being said, if you wish to implement it yourself off of my
current PR
and submit it as a competitive alternative, I would be more than
happy to
help vet that as an alternate solution. As it stands right now, I do
not
really have more time to invest into alternatives without there being
a
strong indication from the binding voters which they would prefer.
Hey, total no worries. I think I personally gave up on the streams DSL
for
some time already, otherwise I would have pulled this KIP through
already.
I am currently reimplementing my own DSL based on PAPI.
I will look at finishing up my PR with the windowed state store in the
next
week or so, exercising it via tests, and then I will come back for
final
discussions. In the meantime, I hope that any of the binding voters
could
take a look at the KIP in the wiki. I have updated it according to the
latest plan:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
I have also updated the KIP PR to use a windowed store. This could be
replaced by the results of KIP-258 whenever they are completed.
https://github.com/apache/kafka/pull/5527
Thanks,
Adam
Is the HighWatermarkResolverProccessorsupplier already updated in the
PR?
expected it to change to Windowed<K>,Long Missing something?
On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com>
wrote:
Correction on my previous email: KAFKA-5533 is the wrong link, as it
is
for
corresponding changelog mechanisms. But as part of KIP-258 we do
want to
have "handling out-of-order data for source KTable" such that
instead of
blindly apply the updates to the materialized store, i.e. following
offset
ordering, we will reject updates that are older than the current
key's
timestamps, i.e. following timestamp ordering.
Guozhang
On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <wangg...@gmail.com>
wrote:
Hello Adam,
Thanks for the explanation. Regarding the final step (i.e. the high
watermark store, now altered to be replaced with a window store), I
think
another current on-going KIP may actually help:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
This is for adding the timestamp into a key-value store (i.e. only
for
non-windowed KTable), and then one of its usage, as described in
https://issues.apache.org/jira/browse/KAFKA-5533, is that we can
then
"reject" updates from the source topics if its timestamp is smaller
than
the current key's latest update timestamp. I think it is very
similar to
what you have in mind for high watermark based filtering, while you
only
need to make sure that the timestamps of the joining records are
correctly
inherited though the whole topology to the final stage.
Note that this KIP is for key-value store and hence non-windowed
KTables
only, but for windowed KTables we do not really have a good support
for
their joins anyways (
https://issues.apache.org/jira/browse/KAFKA-7107)
I
think we can just consider non-windowed KTable-KTable non-key joins
for
now. In which case, KIP-258 should help.
Guozhang
On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:
On 11.09.2018 18:00, Adam Bellemare wrote:
Hi Guozhang
Current highwater mark implementation would grow endlessly based
on
primary key of original event. It is a pair of (<this table
primary
key>,
<highest offset seen for that key>). This is used to differentiate
between
late arrivals and new updates. My newest proposal would be to
replace
it
with a Windowed state store of Duration N. This would allow the same
behaviour, but cap the size based on time. This should allow for
all
late-arriving events to be processed, and should be customizable
by
the
user to tailor to their own needs (ie: perhaps just 10 minutes of
window,
or perhaps 7 days...).
Hi Adam, using time based retention can do the trick here. Even
if I
would still like to see the automatic repartitioning optional
since I
would
just reshuffle again. With windowed store I am a little bit
sceptical
about
how to determine the window. So esentially one could run into
problems
when
the rapid change happens near a window border. I will check you
implementation in detail, if its problematic, we could still check
_all_
windows on read with not to bad performance impact I guess. Will
let
you
know if the implementation would be correct as is. I wouldn't not
like
to
assume that: offset(A) < offset(B) => timestamp(A) < timestamp(B).
I
think
we can't expect that.
@Jan
I believe I understand what you mean now - thanks for the
diagram, it
did really help. You are correct that I do not have the original
primary
key available, and I can see that if it was available then you
would be
able to add and remove events from the Map. That being said, I
encourage
you to finish your diagrams / charts just for clarity for everyone
else.
Yeah 100%, this giphy thing is just really hard work. But I
understand
the benefits for the rest. Sorry about the original primary key, We
have
join and Group by implemented our own in PAPI and basically not
using
any
DSL (Just the abstraction). Completely missed that in original DSL
its
not
there and just assumed it. total brain mess up on my end. Will
finish
the
chart as soon as i get a quite evening this week.
My follow up question for you is, won't the Map stay inside the
State
Store indefinitely after all of the changes have propagated? Isn't
this
effectively the same as a highwater mark state store?
Thing is that if the map is empty, substractor is gonna return
`null`
and
the key is removed from the keyspace. But there is going to be a
store
100%, the good thing is that I can use this store directly for
materialize() / enableSendingOldValues() is a regular store,
satisfying
all gurantees needed for further groupby / join. The Windowed
store is
not
keeping the values, so for the next statefull operation we would
need to instantiate an extra store. or we have the window store
also
have
the values then.
Long story short. if we can flip in a custom group by before
repartitioning to the original primary key i think it would help
the
users
big time in building efficient apps. Given the original primary key
issue I
understand that we do not have a solid foundation to build on.
Leaving primary key carry along to the user. very unfortunate. I
could
understand the decision goes like that. I do not think its a good
decision.
Thanks
Adam
On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
dumbreprajakta...@gmail.com <mailto:dumbreprajakta...@gmail.com>>
wrote:
please remove me from this group
On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
<jan.filip...@trivago.com <mailto:jan.filip...@trivago.com
wrote:
> Hi Adam,
>
> give me some time, will make such a chart. last time i
didn't
get along
> well with giphy and ruined all your charts.
> Hopefully i can get it done today
>
> On 08.09.2018 16:00, Adam Bellemare wrote:
> > Hi Jan
> >
> > I have included a diagram of what I attempted on the
KIP.
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
inKTable-GroupBy+Reduce/Aggregate
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
ginKTable-GroupBy+Reduce/Aggregate>
> >
> > I attempted this back at the start of my own
implementation
of
this
> > solution, and since I could not get it to work I have
since
discarded the
> > code. At this point in time, if you wish to continue
pursuing
for your
> > groupBy solution, I ask that you please create a
diagram on
the KIP
> > carefully explaining your solution. Please feel free to
use
the image I
> > just posted as a starting point. I am having trouble
understanding your
> > explanations but I think that a carefully constructed
diagram
will clear
> up
> > any misunderstandings. Alternately, please post a
comprehensive PR with
> > your solution. I can only guess at what you mean, and
since I
value my
> own
> > time as much as you value yours, I believe it is your
responsibility to
> > provide an implementation instead of me trying to guess.
> >
> > Adam
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
<jan.filip...@trivago.com <mailto:jan.filip...@trivago.com
> > wrote:
> >
> >> Hi James,
> >>
> >> nice to see you beeing interested. kafka streams at
this
point supports
> >> all sorts of joins as long as both streams have the
same
key.
> >> Adam is currently implementing a join where a KTable
and a
KTable can
> have
> >> a one to many relation ship (1:n). We exploit that
rocksdb
is
a
> >> datastore that keeps data sorted (At least exposes an
API to
access the
> >> stored data in a sorted fashion).
> >>
> >> I think the technical caveats are well understood now
and we
are
> basically
> >> down to philosophy and API Design ( when Adam sees my
newest
message).
> >> I have a lengthy track record of loosing those kinda
arguments within
> the
> >> streams community and I have no clue why. So I
literally
can't wait for
> you
> >> to churn through this thread and give you opinion on
how we
should
> design
> >> the return type of the oneToManyJoin and how many
power we
want to give
> to
> >> the user vs "simplicity" (where simplicity isn't
really that
as users
> still
> >> need to understand it I argue)
> >>
> >> waiting for you to join in on the discussion
> >>
> >> Best Jan
> >>
> >>
> >>
> >> On 07.09.2018 15:49, James Kwan wrote:
> >>
> >>> I am new to this group and I found this subject
interesting. Sounds
> like
> >>> you guys want to implement a join table of two
streams? Is
there
> somewhere
> >>> I can see the original requirement or proposal?
> >>>
> >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
<jan.filip...@trivago.com <mailto:jan.filip...@trivago.com
> >>>> wrote:
> >>>>
> >>>>
> >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
> >>>>
> >>>>> I'm currently testing using a Windowed Store to
store the
highwater
> >>>>> mark.
> >>>>> By all indications this should work fine, with the
caveat
being that
> it
> >>>>> can
> >>>>> only resolve out-of-order arrival for up to the
size of
the window
> (ie:
> >>>>> 24h, 72h, etc). This would remove the possibility
of it
being
> unbounded
> >>>>> in
> >>>>> size.
> >>>>>
> >>>>> With regards to Jan's suggestion, I believe this is
where
we will
> have
> >>>>> to
> >>>>> remain in disagreement. While I do not disagree
with your
statement
> >>>>> about
> >>>>> there likely to be additional joins done in a
real-world
workflow, I
> do
> >>>>> not
> >>>>> see how you can conclusively deal with out-of-order
arrival
of
> >>>>> foreign-key
> >>>>> changes and subsequent joins. I have attempted what
I
think you have
> >>>>> proposed (without a high-water, using groupBy and
reduce)
and found
> >>>>> that if
> >>>>> the foreign key changes too quickly, or the load on
a
stream thread
> is
> >>>>> too
> >>>>> high, the joined messages will arrive out-of-order
and be
incorrectly
> >>>>> propagated, such that an intermediate event is
represented
as the
> final
> >>>>> event.
> >>>>>
> >>>> Can you shed some light on your groupBy
implementation.
There must be
> >>>> some sort of flaw in it.
> >>>> I have a suspicion where it is, I would just like to
confirm. The idea
> >>>> is bullet proof and it must be
> >>>> an implementation mess up. I would like to clarify
before
we draw a
> >>>> conclusion.
> >>>>
> >>>> Repartitioning the scattered events back to their
original
> >>>>> partitions is the only way I know how to conclusively
deal
with
> >>>>> out-of-order events in a given time frame, and to
ensure
that the
> data
> >>>>> is
> >>>>> eventually consistent with the input events.
> >>>>>
> >>>>> If you have some code to share that illustrates your
approach, I
> would
> >>>>> be
> >>>>> very grateful as it would remove any
misunderstandings
that I may
> have.
> >>>>>
> >>>> ah okay you were looking for my code. I don't have
something easily
> >>>> readable here as its bloated with OO-patterns.
> >>>>
> >>>> its anyhow trivial:
> >>>>
> >>>> @Override
> >>>> public T apply(K aggKey, V value, T aggregate)
> >>>> {
> >>>> Map<U, V> currentStateAsMap =
asMap(aggregate);
<<
imaginary
> >>>> U toModifyKey = mapper.apply(value);
> >>>> << this is the place where people
actually
gonna have
> issues
> >>>> and why you probably couldn't do it. we would need
to find
a solution
> here.
> >>>> I didn't realize that yet.
> >>>> << we propagate the field in the
joiner, so
that we can
> pick
> >>>> it up in an aggregate. Probably you have not thought
of
this in your
> >>>> approach right?
> >>>> << I am very open to find a generic
solution
here. In my
> >>>> honest opinion this is broken in KTableImpl.GroupBy
that
it
looses
> the keys
> >>>> and only maintains the aggregate key.
> >>>> << I abstracted it away back then way
before
i
was
> thinking
> >>>> of oneToMany join. That is why I didn't realize its
significance here.
> >>>> << Opinions?
> >>>>
> >>>> for (V m : current)
> >>>> {
> >>>> currentStateAsMap.put(mapper.apply(m), m);
> >>>> }
> >>>> if (isAdder)
> >>>> {
> >>>> currentStateAsMap.put(toModifyKey, value);
> >>>> }
> >>>> else
> >>>> {
> >>>> currentStateAsMap.remove(toModifyKey);
> >>>> if(currentStateAsMap.isEmpty()){
> >>>> return null;
> >>>> }
> >>>> }
> >>>> retrun asAggregateType(currentStateAsMap)
> >>>> }
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
> jan.filip...@trivago.com <mailto:jan.filip...@trivago.com
> >>>>> wrote:
> >>>>>
> >>>>> Thanks Adam for bringing Matthias to speed!
> >>>>>> about the differences. I think re-keying back
should be
optional at
> >>>>>> best.
> >>>>>> I would say we return a KScatteredTable with
reshuffle()
returning
> >>>>>> KTable<originalKey,Joined> to make the backwards
repartitioning
> >>>>>> optional.
> >>>>>> I am also in a big favour of doing the out of order
processing using
> >>>>>> group
> >>>>>> by instead high water mark tracking.
> >>>>>> Just because unbounded growth is just scary + It
saves
us
the header
> >>>>>> stuff.
> >>>>>>
> >>>>>> I think the abstraction of always repartitioning
back is
just not so
> >>>>>> strong. Like the work has been done before we
partition
back and
> >>>>>> grouping
> >>>>>> by something else afterwards is really common.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
> >>>>>>
> >>>>>> Hi Matthias
> >>>>>>> Thank you for your feedback, I do appreciate it!
> >>>>>>>
> >>>>>>> While name spacing would be possible, it would
require
to
> deserialize
> >>>>>>>
> >>>>>>>> user headers what implies a runtime overhead. I
would
suggest to
> no
> >>>>>>>> namespace for now to avoid the overhead. If this
becomes a
> problem in
> >>>>>>>> the future, we can still add name spacing later
on.
> >>>>>>>>
> >>>>>>>> Agreed. I will go with using a reserved string
and
document it.
> >>>>>>>
> >>>>>>>
> >>>>>>> My main concern about the design it the type of
the
result KTable:
> If
> >>>>>>> I
> >>>>>>> understood the proposal correctly,
> >>>>>>>
> >>>>>>>
> >>>>>>> In your example, you have table1 and table2
swapped.
Here is how it
> >>>>>>> works
> >>>>>>> currently:
> >>>>>>>
> >>>>>>> 1) table1 has the records that contain the
foreign key
within their
> >>>>>>> value.
> >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
<b,(fk=A,bar=2)>,
> >>>>>>> <c,(fk=B,bar=3)>
> >>>>>>> table2 input stream: <A,X>, <B,Y>
> >>>>>>>
> >>>>>>> 2) A Value mapper is required to extract the
foreign
key.
> >>>>>>> table1 foreign key mapper: ( value => value.fk
<http://value.fk> )
> >>>>>>>
> >>>>>>> The mapper is applied to each element in table1,
and a
new combined
> >>>>>>> key is
> >>>>>>> made:
> >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b,
(fk=A,bar=2)>,
<B-c,
> >>>>>>> (fk=B,bar=3)>
> >>>>>>>
> >>>>>>> 3) The rekeyed events are copartitioned with
table2:
> >>>>>>>
> >>>>>>> a) Stream Thread with Partition 0:
> >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b,
(fk=A,bar=2)>
> >>>>>>> Table2: <A,X>
> >>>>>>>
> >>>>>>> b) Stream Thread with Partition 1:
> >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
> >>>>>>> Table2: <B,Y>
> >>>>>>>
> >>>>>>> 4) From here, they can be joined together locally
by
applying the
> >>>>>>> joiner
> >>>>>>> function.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> At this point, Jan's design and my design
deviate. My
design goes
> on
> >>>>>>> to
> >>>>>>> repartition the data post-join and resolve
out-of-order
arrival of
> >>>>>>> records,
> >>>>>>> finally returning the data keyed just the
original key.
I do not
> >>>>>>> expose
> >>>>>>> the
> >>>>>>> CombinedKey or any of the internals outside of the
joinOnForeignKey
> >>>>>>> function. This does make for larger footprint,
but it
removes all
> >>>>>>> agency
> >>>>>>> for resolving out-of-order arrivals and handling
CombinedKeys from
> the
> >>>>>>> user. I believe that this makes the function much
easier
to use.
> >>>>>>>
> >>>>>>> Let me know if this helps resolve your questions,
and
please feel
> >>>>>>> free to
> >>>>>>> add anything else on your mind.
> >>>>>>>
> >>>>>>> Thanks again,
> >>>>>>> Adam
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
> >>>>>>> matth...@confluent.io <mailto:
matth...@confluent.io>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>>> I am just catching up on this thread. I did not
read
everything so
> >>>>>>>> far,
> >>>>>>>> but want to share couple of initial thoughts:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Headers: I think there is a fundamental
difference
between header
> >>>>>>>> usage
> >>>>>>>> in this KIP and KP-258. For 258, we add headers
to
changelog topic
> >>>>>>>> that
> >>>>>>>> are owned by Kafka Streams and nobody else is
supposed
to write
> into
> >>>>>>>> them. In fact, no user header are written into
the
changelog topic
> >>>>>>>> and
> >>>>>>>> thus, there are not conflicts.
> >>>>>>>>
> >>>>>>>> Nevertheless, I don't see a big issue with using
headers within
> >>>>>>>> Streams.
> >>>>>>>> As long as we document it, we can have some
"reserved"
header keys
> >>>>>>>> and
> >>>>>>>> users are not allowed to use when processing
data with
Kafka
> Streams.
> >>>>>>>> IMHO, this should be ok.
> >>>>>>>>
> >>>>>>>> I think there is a safe way to avoid conflicts,
since
these
> headers
> >>>>>>>> are
> >>>>>>>>
> >>>>>>>>> only needed in internal topics (I think):
> >>>>>>>>> For internal and changelog topics, we can
namespace
all headers:
> >>>>>>>>> * user-defined headers are namespaced as
"external."
+
headerKey
> >>>>>>>>> * internal headers are namespaced as
"internal." +
headerKey
> >>>>>>>>>
> >>>>>>>>> While name spacing would be possible, it would
require
to
> >>>>>>>> deserialize
> >>>>>>>> user headers what implies a runtime overhead. I
would
suggest to
> no
> >>>>>>>> namespace for now to avoid the overhead. If this
becomes a
> problem in
> >>>>>>>> the future, we can still add name spacing later
on.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> My main concern about the design it the type of
the
result KTable:
> >>>>>>>> If I
> >>>>>>>> understood the proposal correctly,
> >>>>>>>>
> >>>>>>>> KTable<K1,V1> table1 = ...
> >>>>>>>> KTable<K2,V2> table2 = ...
> >>>>>>>>
> >>>>>>>> KTable<K1,V3> joinedTable =
table1.join(table2,...);
> >>>>>>>>
> >>>>>>>> implies that the `joinedTable` has the same key
as the
left input
> >>>>>>>> table.
> >>>>>>>> IMHO, this does not work because if table2
contains
multiple rows
> >>>>>>>> that
> >>>>>>>> join with a record in table1 (what is the main
purpose
of
a
> foreign
> >>>>>>>> key
> >>>>>>>> join), the result table would only contain a
single
join result,
> but
> >>>>>>>> not
> >>>>>>>> multiple.
> >>>>>>>>
> >>>>>>>> Example:
> >>>>>>>>
> >>>>>>>> table1 input stream: <A,X>
> >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
> >>>>>>>>
> >>>>>>>> We use table2 value a foreign key to table1 key
(ie,
"A" joins).
> If
> >>>>>>>> the
> >>>>>>>> result key is the same key as key of table1, this
implies that the
> >>>>>>>> result can either be <A, join(X,1)> or <A,
join(X,2)>
but not
> both.
> >>>>>>>> Because the share the same key, whatever result
record
we emit
> later,
> >>>>>>>> overwrite the previous result.
> >>>>>>>>
> >>>>>>>> This is the reason why Jan originally proposed
to use
a
> combination
> >>>>>>>> of
> >>>>>>>> both primary keys of the input tables as key of
the
output table.
> >>>>>>>> This
> >>>>>>>> makes the keys of the output table unique and we
can
store both in
> >>>>>>>> the
> >>>>>>>> output table:
> >>>>>>>>
> >>>>>>>> Result would be <A-a, join(X,1)>, <A-b,
join(X,2)>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> >>>>>>>>
> >>>>>>>> Just on remark here.
> >>>>>>>>> The high-watermark could be disregarded. The
decision
about the
> >>>>>>>>> forward
> >>>>>>>>> depends on the size of the aggregated map.
> >>>>>>>>> Only 1 element long maps would be unpacked and
forwarded. 0
> element
> >>>>>>>>> maps
> >>>>>>>>> would be published as delete. Any other count
> >>>>>>>>> of map entries is in "waiting for correct
deletes to
> arrive"-state.
> >>>>>>>>>
> >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
> >>>>>>>>>
> >>>>>>>>> It does look like I could replace the second
repartition store
> and
> >>>>>>>>>> highwater store with a groupBy and reduce.
However,
it looks
> like
> >>>>>>>>>> I
> >>>>>>>>>> would
> >>>>>>>>>> still need to store the highwater value within
the
materialized
> >>>>>>>>>> store,
> >>>>>>>>>>
> >>>>>>>>>> to
> >>>>>>>>> compare the arrival of out-of-order records
(assuming
my
> >>>>>>>>> understanding
> >>>>>>>>> of
> >>>>>>>>> THIS is correct...). This in effect is the same
as
the
design I
> have
> >>>>>>>>> now,
> >>>>>>>>> just with the two tables merged together.
> >>>>>>>>>
>
>
--
-- Guozhang
--
-- Guozhang