Will that work? I expected it to blow up with ClassCastException or similar.

You either would have to specify the window you fetch/put or iterate across all windows the key was found in right?

I just hope the window-store doesn't check stream-time under the hoods that would be a questionable interface.

If it does: did you see my comment on checking all the windows earlier?
that would be needed to actually give reasonable time gurantees.

Best



On 25.09.2018 13:18, Adam Bellemare wrote:
Hi Jan

Check for  " highwaterMat " in the PR. I only changed the state store, not
the ProcessorSupplier.

Thanks,
Adam

On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:



On 24.09.2018 16:26, Adam Bellemare wrote:

@Guozhang

Thanks for the information. This is indeed something that will be
extremely
useful for this KIP.

@Jan
Thanks for your explanations. That being said, I will not be moving ahead
with an implementation using reshuffle/groupBy solution as you propose.
That being said, if you wish to implement it yourself off of my current PR
and submit it as a competitive alternative, I would be more than happy to
help vet that as an alternate solution. As it stands right now, I do not
really have more time to invest into alternatives without there being a
strong indication from the binding voters which they would prefer.


Hey, total no worries. I think I personally gave up on the streams DSL for
some time already, otherwise I would have pulled this KIP through already.
I am currently reimplementing my own DSL based on PAPI.


I will look at finishing up my PR with the windowed state store in the
next
week or so, exercising it via tests, and then I will come back for final
discussions. In the meantime, I hope that any of the binding voters could
take a look at the KIP in the wiki. I have updated it according to the
latest plan:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable

I have also updated the KIP PR to use a windowed store. This could be
replaced by the results of KIP-258 whenever they are completed.
https://github.com/apache/kafka/pull/5527

Thanks,

Adam


Is the HighWatermarkResolverProccessorsupplier already updated in the PR?
expected it to change to Windowed<K>,Long Missing something?





On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com>
wrote:

Correction on my previous email: KAFKA-5533 is the wrong link, as it is
for
corresponding changelog mechanisms. But as part of KIP-258 we do want to
have "handling out-of-order data for source KTable" such that instead of
blindly apply the updates to the materialized store, i.e. following
offset
ordering, we will reject updates that are older than the current key's
timestamps, i.e. following timestamp ordering.


Guozhang

On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Adam,

Thanks for the explanation. Regarding the final step (i.e. the high
watermark store, now altered to be replaced with a window store), I
think
another current on-going KIP may actually help:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB


This is for adding the timestamp into a key-value store (i.e. only for
non-windowed KTable), and then one of its usage, as described in
https://issues.apache.org/jira/browse/KAFKA-5533, is that we can then
"reject" updates from the source topics if its timestamp is smaller than
the current key's latest update timestamp. I think it is very similar to
what you have in mind for high watermark based filtering, while you only
need to make sure that the timestamps of the joining records are

correctly

inherited though the whole topology to the final stage.

Note that this KIP is for key-value store and hence non-windowed KTables
only, but for windowed KTables we do not really have a good support for
their joins anyways (https://issues.apache.org/jira/browse/KAFKA-7107)
I
think we can just consider non-windowed KTable-KTable non-key joins for
now. In which case, KIP-258 should help.



Guozhang



On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <jan.filip...@trivago.com

wrote:


On 11.09.2018 18:00, Adam Bellemare wrote:

Hi Guozhang

Current highwater mark implementation would grow endlessly based on
primary key of original event. It is a pair of (<this table primary

key>,

<highest offset seen for that key>). This is used to differentiate

between

late arrivals and new updates. My newest proposal would be to replace

it

with a Windowed state store of Duration N. This would allow the same
behaviour, but cap the size based on time. This should allow for all
late-arriving events to be processed, and should be customizable by
the
user to tailor to their own needs (ie: perhaps just 10 minutes of

window,

or perhaps 7 days...).

Hi Adam, using time based retention can do the trick here. Even if I
would still like to see the automatic repartitioning optional since I

would

just reshuffle again. With windowed store I am a little bit sceptical

about

how to determine the window. So esentially one could run into problems

when

the rapid change happens near a window border. I will check you
implementation in detail, if its problematic, we could still check
_all_
windows on read with not to bad performance impact I guess. Will let
you
know if the implementation would be correct as is. I wouldn't not like

to

assume that: offset(A) < offset(B) => timestamp(A)  < timestamp(B). I

think

we can't expect that.



@Jan
I believe I understand what you mean now - thanks for the diagram, it
did really help. You are correct that I do not have the original

primary

key available, and I can see that if it was available then you would be
able to add and remove events from the Map. That being said, I

encourage

you to finish your diagrams / charts just for clarity for everyone

else.


Yeah 100%, this giphy thing is just really hard work. But I understand

the benefits for the rest. Sorry about the original primary key, We
have
join and Group by implemented our own in PAPI and basically not using

any

DSL (Just the abstraction). Completely missed that in original DSL its

not

there and just assumed it. total brain mess up on my end. Will finish

the

chart as soon as i get a quite evening this week.

My follow up question for you is, won't the Map stay inside the State

Store indefinitely after all of the changes have propagated? Isn't
this
effectively the same as a highwater mark state store?

Thing is that if the map is empty, substractor is gonna return `null`

and

the key is removed from the keyspace. But there is going to be a store
100%, the good thing is that I can use this store directly for
materialize() / enableSendingOldValues() is a regular store, satisfying
all gurantees needed for further groupby / join. The Windowed store is

not

keeping the values, so for the next statefull operation we would
need to instantiate an extra store. or we have the window store also

have

the values then.

Long story short. if we can flip in a custom group by before
repartitioning to the original primary key i think it would help the

users

big time in building efficient apps. Given the original primary key

issue I

understand that we do not have a solid foundation to build on.
Leaving primary key carry along to the user. very unfortunate. I could
understand the decision goes like that. I do not think its a good

decision.






Thanks
Adam






On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
dumbreprajakta...@gmail.com <mailto:dumbreprajakta...@gmail.com>>

wrote:


      please remove me from this group

      On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>

      wrote:

      > Hi Adam,
      >
      > give me some time, will make such a chart. last time i didn't
      get along
      > well with giphy and ruined all your charts.
      > Hopefully i can get it done today
      >
      > On 08.09.2018 16:00, Adam Bellemare wrote:
      > > Hi Jan
      > >
      > > I have included a diagram of what I attempted on the KIP.
      > >
      >
      https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
inKTable-GroupBy+Reduce/Aggregate
      <https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
ginKTable-GroupBy+Reduce/Aggregate>
      > >
      > > I attempted this back at the start of my own implementation
of
      this
      > > solution, and since I could not get it to work I have since
      discarded the
      > > code. At this point in time, if you wish to continue pursuing
      for your
      > > groupBy solution, I ask that you please create a diagram on
      the KIP
      > > carefully explaining your solution. Please feel free to use
      the image I
      > > just posted as a starting point. I am having trouble
      understanding your
      > > explanations but I think that a carefully constructed diagram
      will clear
      > up
      > > any misunderstandings. Alternately, please post a
      comprehensive PR with
      > > your solution. I can only guess at what you mean, and since I
      value my
      > own
      > > time as much as you value yours, I believe it is your
      responsibility to
      > > provide an implementation instead of me trying to guess.
      > >
      > > Adam
      > >
      > >
      > >
      > >
      > >
      > >
      > >
      > >
      > >
      > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>

      > > wrote:
      > >
      > >> Hi James,
      > >>
      > >> nice to see you beeing interested. kafka streams at this
      point supports
      > >> all sorts of joins as long as both streams have the same
key.
      > >> Adam is currently implementing a join where a KTable and a
      KTable can
      > have
      > >> a one to many relation ship (1:n). We exploit that rocksdb
is

a

      > >> datastore that keeps data sorted (At least exposes an API to
      access the
      > >> stored data in a sorted fashion).
      > >>
      > >> I think the technical caveats are well understood now and we

are

      > basically
      > >> down to philosophy and API Design ( when Adam sees my newest
      message).
      > >> I have a lengthy track record of loosing those kinda
      arguments within
      > the
      > >> streams community and I have no clue why. So I literally
      can't wait for
      > you
      > >> to churn through this thread and give you opinion on how we
      should
      > design
      > >> the return type of the oneToManyJoin and how many power we
      want to give
      > to
      > >> the user vs "simplicity" (where simplicity isn't really that
      as users
      > still
      > >> need to understand it I argue)
      > >>
      > >> waiting for you to join in on the discussion
      > >>
      > >> Best Jan
      > >>
      > >>
      > >>
      > >> On 07.09.2018 15:49, James Kwan wrote:
      > >>
      > >>> I am new to this group and I found this subject
      interesting.  Sounds
      > like
      > >>> you guys want to implement a join table of two streams? Is
there
      > somewhere
      > >>> I can see the original requirement or proposal?
      > >>>
      > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>

      > >>>> wrote:
      > >>>>
      > >>>>
      > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
      > >>>>
      > >>>>> I'm currently testing using a Windowed Store to store the
      highwater
      > >>>>> mark.
      > >>>>> By all indications this should work fine, with the caveat
      being that
      > it
      > >>>>> can
      > >>>>> only resolve out-of-order arrival for up to the size of
      the window
      > (ie:
      > >>>>> 24h, 72h, etc). This would remove the possibility of it

being

      > unbounded
      > >>>>> in
      > >>>>> size.
      > >>>>>
      > >>>>> With regards to Jan's suggestion, I believe this is where
      we will
      > have
      > >>>>> to
      > >>>>> remain in disagreement. While I do not disagree with your
      statement
      > >>>>> about
      > >>>>> there likely to be additional joins done in a real-world
      workflow, I
      > do
      > >>>>> not
      > >>>>> see how you can conclusively deal with out-of-order
arrival
of
      > >>>>> foreign-key
      > >>>>> changes and subsequent joins. I have attempted what I
      think you have
      > >>>>> proposed (without a high-water, using groupBy and reduce)
      and found
      > >>>>> that if
      > >>>>> the foreign key changes too quickly, or the load on a
      stream thread
      > is
      > >>>>> too
      > >>>>> high, the joined messages will arrive out-of-order and be
      incorrectly
      > >>>>> propagated, such that an intermediate event is
represented
      as the
      > final
      > >>>>> event.
      > >>>>>
      > >>>> Can you shed some light on your groupBy implementation.
      There must be
      > >>>> some sort of flaw in it.
      > >>>> I have a suspicion where it is, I would just like to
      confirm. The idea
      > >>>> is bullet proof and it must be
      > >>>> an implementation mess up. I would like to clarify before
      we draw a
      > >>>> conclusion.
      > >>>>
      > >>>>    Repartitioning the scattered events back to their

original

      > >>>>> partitions is the only way I know how to conclusively deal
      with
      > >>>>> out-of-order events in a given time frame, and to ensure
      that the
      > data
      > >>>>> is
      > >>>>> eventually consistent with the input events.
      > >>>>>
      > >>>>> If you have some code to share that illustrates your
      approach, I
      > would
      > >>>>> be
      > >>>>> very grateful as it would remove any misunderstandings
      that I may
      > have.
      > >>>>>
      > >>>> ah okay you were looking for my code. I don't have
      something easily
      > >>>> readable here as its bloated with OO-patterns.
      > >>>>
      > >>>> its anyhow trivial:
      > >>>>
      > >>>> @Override
      > >>>>      public T apply(K aggKey, V value, T aggregate)
      > >>>>      {
      > >>>>          Map<U, V> currentStateAsMap = asMap(aggregate);
<<
      imaginary
      > >>>>          U toModifyKey = mapper.apply(value);
      > >>>>              << this is the place where people actually
      gonna have
      > issues
      > >>>> and why you probably couldn't do it. we would need to find
      a solution
      > here.
      > >>>> I didn't realize that yet.
      > >>>>              << we propagate the field in the joiner, so
      that we can
      > pick
      > >>>> it up in an aggregate. Probably you have not thought of
      this in your
      > >>>> approach right?
      > >>>>              << I am very open to find a generic solution
      here. In my
      > >>>> honest opinion this is broken in KTableImpl.GroupBy that
it
      looses
      > the keys
      > >>>> and only maintains the aggregate key.
      > >>>>              << I abstracted it away back then way before
i
was
      > thinking
      > >>>> of oneToMany join. That is why I didn't realize its
      significance here.
      > >>>>              << Opinions?
      > >>>>
      > >>>>          for (V m : current)
      > >>>>          {
      > >>>> currentStateAsMap.put(mapper.apply(m), m);
      > >>>>          }
      > >>>>          if (isAdder)
      > >>>>          {
      > >>>> currentStateAsMap.put(toModifyKey, value);
      > >>>>          }
      > >>>>          else
      > >>>>          {
      > >>>> currentStateAsMap.remove(toModifyKey);
      > >>>> if(currentStateAsMap.isEmpty()){
      > >>>>                  return null;
      > >>>>              }
      > >>>>          }
      > >>>>          retrun asAggregateType(currentStateAsMap)
      > >>>>      }
      > >>>>
      > >>>>
      > >>>>
      > >>>>
      > >>>>
      > >>>> Thanks,
      > >>>>> Adam
      > >>>>>
      > >>>>>
      > >>>>>
      > >>>>>
      > >>>>>
      > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
      > jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>

      > >>>>> wrote:
      > >>>>>
      > >>>>> Thanks Adam for bringing Matthias to speed!
      > >>>>>> about the differences. I think re-keying back should be
      optional at
      > >>>>>> best.
      > >>>>>> I would say we return a KScatteredTable with reshuffle()
      returning
      > >>>>>> KTable<originalKey,Joined> to make the backwards
      repartitioning
      > >>>>>> optional.
      > >>>>>> I am also in a big favour of doing the out of order
      processing using
      > >>>>>> group
      > >>>>>> by instead high water mark tracking.
      > >>>>>> Just because unbounded growth is just scary + It saves
us
      the header
      > >>>>>> stuff.
      > >>>>>>
      > >>>>>> I think the abstraction of always repartitioning back is
      just not so
      > >>>>>> strong. Like the work has been done before we partition
      back and
      > >>>>>> grouping
      > >>>>>> by something else afterwards is really common.
      > >>>>>>
      > >>>>>>
      > >>>>>>
      > >>>>>>
      > >>>>>>
      > >>>>>>
      > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
      > >>>>>>
      > >>>>>> Hi Matthias
      > >>>>>>> Thank you for your feedback, I do appreciate it!
      > >>>>>>>
      > >>>>>>> While name spacing would be possible, it would require
to
      > deserialize
      > >>>>>>>
      > >>>>>>>> user headers what implies a runtime overhead. I would
      suggest to
      > no
      > >>>>>>>> namespace for now to avoid the overhead. If this

becomes a

      > problem in
      > >>>>>>>> the future, we can still add name spacing later on.
      > >>>>>>>>
      > >>>>>>>> Agreed. I will go with using a reserved string and
      document it.
      > >>>>>>>
      > >>>>>>>
      > >>>>>>> My main concern about the design it the type of the
      result KTable:
      > If
      > >>>>>>> I
      > >>>>>>> understood the proposal correctly,
      > >>>>>>>
      > >>>>>>>
      > >>>>>>> In your example, you have table1 and table2 swapped.
      Here is how it
      > >>>>>>> works
      > >>>>>>> currently:
      > >>>>>>>
      > >>>>>>> 1) table1 has the records that contain the foreign key
      within their
      > >>>>>>> value.
      > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
<b,(fk=A,bar=2)>,
      > >>>>>>> <c,(fk=B,bar=3)>
      > >>>>>>> table2 input stream: <A,X>, <B,Y>
      > >>>>>>>
      > >>>>>>> 2) A Value mapper is required to extract the foreign
key.
      > >>>>>>> table1 foreign key mapper: ( value => value.fk
      <http://value.fk> )

      > >>>>>>>
      > >>>>>>> The mapper is applied to each element in table1, and a
      new combined
      > >>>>>>> key is
      > >>>>>>> made:
      > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b,
(fk=A,bar=2)>,
      <B-c,
      > >>>>>>> (fk=B,bar=3)>
      > >>>>>>>
      > >>>>>>> 3) The rekeyed events are copartitioned with table2:
      > >>>>>>>
      > >>>>>>> a) Stream Thread with Partition 0:
      > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b,
      (fk=A,bar=2)>
      > >>>>>>> Table2: <A,X>
      > >>>>>>>
      > >>>>>>> b) Stream Thread with Partition 1:
      > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
      > >>>>>>> Table2: <B,Y>
      > >>>>>>>
      > >>>>>>> 4) From here, they can be joined together locally by
      applying the
      > >>>>>>> joiner
      > >>>>>>> function.
      > >>>>>>>
      > >>>>>>>
      > >>>>>>>
      > >>>>>>> At this point, Jan's design and my design deviate. My
      design goes
      > on
      > >>>>>>> to
      > >>>>>>> repartition the data post-join and resolve out-of-order
      arrival of
      > >>>>>>> records,
      > >>>>>>> finally returning the data keyed just the original key.
      I do not
      > >>>>>>> expose
      > >>>>>>> the
      > >>>>>>> CombinedKey or any of the internals outside of the
      joinOnForeignKey
      > >>>>>>> function. This does make for larger footprint, but it
      removes all
      > >>>>>>> agency
      > >>>>>>> for resolving out-of-order arrivals and handling
      CombinedKeys from
      > the
      > >>>>>>> user. I believe that this makes the function much
easier
      to use.
      > >>>>>>>
      > >>>>>>> Let me know if this helps resolve your questions, and
      please feel
      > >>>>>>> free to
      > >>>>>>> add anything else on your mind.
      > >>>>>>>
      > >>>>>>> Thanks again,
      > >>>>>>> Adam
      > >>>>>>>
      > >>>>>>>
      > >>>>>>>
      > >>>>>>>
      > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
      > >>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>

      > >>>>>>> wrote:
      > >>>>>>>
      > >>>>>>> Hi,
      > >>>>>>>
      > >>>>>>>> I am just catching up on this thread. I did not read
      everything so
      > >>>>>>>> far,
      > >>>>>>>> but want to share couple of initial thoughts:
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>> Headers: I think there is a fundamental difference
      between header
      > >>>>>>>> usage
      > >>>>>>>> in this KIP and KP-258. For 258, we add headers to
      changelog topic
      > >>>>>>>> that
      > >>>>>>>> are owned by Kafka Streams and nobody else is supposed
      to write
      > into
      > >>>>>>>> them. In fact, no user header are written into the
      changelog topic
      > >>>>>>>> and
      > >>>>>>>> thus, there are not conflicts.
      > >>>>>>>>
      > >>>>>>>> Nevertheless, I don't see a big issue with using
      headers within
      > >>>>>>>> Streams.
      > >>>>>>>> As long as we document it, we can have some "reserved"
      header keys
      > >>>>>>>> and
      > >>>>>>>> users are not allowed to use when processing data with
      Kafka
      > Streams.
      > >>>>>>>> IMHO, this should be ok.
      > >>>>>>>>
      > >>>>>>>> I think there is a safe way to avoid conflicts, since
these
      > headers
      > >>>>>>>> are
      > >>>>>>>>
      > >>>>>>>>> only needed in internal topics (I think):
      > >>>>>>>>> For internal and changelog topics, we can namespace
      all headers:
      > >>>>>>>>> * user-defined headers are namespaced as "external."
+
      headerKey
      > >>>>>>>>> * internal headers are namespaced as "internal." +
      headerKey
      > >>>>>>>>>
      > >>>>>>>>> While name spacing would be possible, it would
require

to

      > >>>>>>>> deserialize
      > >>>>>>>> user headers what implies a runtime overhead. I would
      suggest to
      > no
      > >>>>>>>> namespace for now to avoid the overhead. If this

becomes a

      > problem in
      > >>>>>>>> the future, we can still add name spacing later on.
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>> My main concern about the design it the type of the
      result KTable:
      > >>>>>>>> If I
      > >>>>>>>> understood the proposal correctly,
      > >>>>>>>>
      > >>>>>>>> KTable<K1,V1> table1 = ...
      > >>>>>>>> KTable<K2,V2> table2 = ...
      > >>>>>>>>
      > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
      > >>>>>>>>
      > >>>>>>>> implies that the `joinedTable` has the same key as the
      left input
      > >>>>>>>> table.
      > >>>>>>>> IMHO, this does not work because if table2 contains
      multiple rows
      > >>>>>>>> that
      > >>>>>>>> join with a record in table1 (what is the main purpose

of

a
      > foreign
      > >>>>>>>> key
      > >>>>>>>> join), the result table would only contain a single
      join result,
      > but
      > >>>>>>>> not
      > >>>>>>>> multiple.
      > >>>>>>>>
      > >>>>>>>> Example:
      > >>>>>>>>
      > >>>>>>>> table1 input stream: <A,X>
      > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
      > >>>>>>>>
      > >>>>>>>> We use table2 value a foreign key to table1 key (ie,
      "A" joins).
      > If
      > >>>>>>>> the
      > >>>>>>>> result key is the same key as key of table1, this
      implies that the
      > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)>
      but not
      > both.
      > >>>>>>>> Because the share the same key, whatever result record
      we emit
      > later,
      > >>>>>>>> overwrite the previous result.
      > >>>>>>>>
      > >>>>>>>> This is the reason why Jan originally proposed to use
a
      > combination
      > >>>>>>>> of
      > >>>>>>>> both primary keys of the input tables as key of the
      output table.
      > >>>>>>>> This
      > >>>>>>>> makes the keys of the output table unique and we can
      store both in
      > >>>>>>>> the
      > >>>>>>>> output table:
      > >>>>>>>>
      > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>> Thoughts?
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>> -Matthias
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>>
      > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
      > >>>>>>>>
      > >>>>>>>> Just on remark here.
      > >>>>>>>>> The high-watermark could be disregarded. The decision
      about the
      > >>>>>>>>> forward
      > >>>>>>>>> depends on the size of the aggregated map.
      > >>>>>>>>> Only 1 element long maps would be unpacked and
      forwarded. 0
      > element
      > >>>>>>>>> maps
      > >>>>>>>>> would be published as delete. Any other count
      > >>>>>>>>> of map entries is in "waiting for correct deletes to
      > arrive"-state.
      > >>>>>>>>>
      > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
      > >>>>>>>>>
      > >>>>>>>>> It does look like I could replace the second
      repartition store
      > and
      > >>>>>>>>>> highwater store with a groupBy and reduce.  However,
      it looks
      > like
      > >>>>>>>>>> I
      > >>>>>>>>>> would
      > >>>>>>>>>> still need to store the highwater value within the
      materialized
      > >>>>>>>>>> store,
      > >>>>>>>>>>
      > >>>>>>>>>> to
      > >>>>>>>>> compare the arrival of out-of-order records (assuming

my

      > >>>>>>>>> understanding
      > >>>>>>>>> of
      > >>>>>>>>> THIS is correct...). This in effect is the same as
the
      design I
      > have
      > >>>>>>>>> now,
      > >>>>>>>>> just with the two tables merged together.
      > >>>>>>>>>
      >
      >






--
-- Guozhang




--
-- Guozhang





Reply via email to