Hi Guozhang

Current highwater mark implementation would grow endlessly based on primary
key of original event. It is a pair of (<this table primary key>, <highest
offset seen for that key>). This is used to differentiate between late
arrivals and new updates. My newest proposal would be to replace it with a
Windowed state store of Duration N. This would allow the same behaviour,
but cap the size based on time. This should allow for all late-arriving
events to be processed, and should be customizable by the user to tailor to
their own needs (ie: perhaps just 10 minutes of window, or perhaps 7
days...).


@Jan
I believe I understand what you mean now - thanks for the diagram, it did
really help. You are correct that I do not have the original primary key
available, and I can see that if it was available then you would be able to
add and remove events from the Map. That being said, I encourage you to
finish your diagrams / charts just for clarity for everyone else.

My follow up question for you is, won't the Map stay inside the State Store
indefinitely after all of the changes have propagated? Isn't this
effectively the same as a highwater mark state store?




Thanks
Adam






On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
dumbreprajakta...@gmail.com> wrote:

> please remove me from this group
>
> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> > Hi Adam,
> >
> > give me some time, will make such a chart. last time i didn't get along
> > well with giphy and ruined all your charts.
> > Hopefully i can get it done today
> >
> > On 08.09.2018 16:00, Adam Bellemare wrote:
> > > Hi Jan
> > >
> > > I have included a diagram of what I attempted on the KIP.
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 213+Support+non-key+joining+in+KTable#KIP-213Supportnon-
> keyjoininginKTable-GroupBy+Reduce/Aggregate
> > >
> > > I attempted this back at the start of my own implementation of this
> > > solution, and since I could not get it to work I have since discarded
> the
> > > code. At this point in time, if you wish to continue pursuing for your
> > > groupBy solution, I ask that you please create a diagram on the KIP
> > > carefully explaining your solution. Please feel free to use the image I
> > > just posted as a starting point. I am having trouble understanding your
> > > explanations but I think that a carefully constructed diagram will
> clear
> > up
> > > any misunderstandings. Alternately, please post a comprehensive PR with
> > > your solution. I can only guess at what you mean, and since I value my
> > own
> > > time as much as you value yours, I believe it is your responsibility to
> > > provide an implementation instead of me trying to guess.
> > >
> > > Adam
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak <jan.filip...@trivago.com
> >
> > > wrote:
> > >
> > >> Hi James,
> > >>
> > >> nice to see you beeing interested. kafka streams at this point
> supports
> > >> all sorts of joins as long as both streams have the same key.
> > >> Adam is currently implementing a join where a KTable and a KTable can
> > have
> > >> a one to many relation ship (1:n). We exploit that rocksdb is a
> > >> datastore that keeps data sorted (At least exposes an API to access
> the
> > >> stored data in a sorted fashion).
> > >>
> > >> I think the technical caveats are well understood now and we are
> > basically
> > >> down to philosophy and API Design ( when Adam sees my newest message).
> > >> I have a lengthy track record of loosing those kinda arguments within
> > the
> > >> streams community and I have no clue why. So I literally can't wait
> for
> > you
> > >> to churn through this thread and give you opinion on how we should
> > design
> > >> the return type of the oneToManyJoin and how many power we want to
> give
> > to
> > >> the user vs "simplicity" (where simplicity isn't really that as users
> > still
> > >> need to understand it I argue)
> > >>
> > >> waiting for you to join in on the discussion
> > >>
> > >> Best Jan
> > >>
> > >>
> > >>
> > >> On 07.09.2018 15:49, James Kwan wrote:
> > >>
> > >>> I am new to this group and I found this subject interesting.  Sounds
> > like
> > >>> you guys want to implement a join table of two streams? Is there
> > somewhere
> > >>> I can see the original requirement or proposal?
> > >>>
> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com>
> > >>>> wrote:
> > >>>>
> > >>>>
> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
> > >>>>
> > >>>>> I'm currently testing using a Windowed Store to store the highwater
> > >>>>> mark.
> > >>>>> By all indications this should work fine, with the caveat being
> that
> > it
> > >>>>> can
> > >>>>> only resolve out-of-order arrival for up to the size of the window
> > (ie:
> > >>>>> 24h, 72h, etc). This would remove the possibility of it being
> > unbounded
> > >>>>> in
> > >>>>> size.
> > >>>>>
> > >>>>> With regards to Jan's suggestion, I believe this is where we will
> > have
> > >>>>> to
> > >>>>> remain in disagreement. While I do not disagree with your statement
> > >>>>> about
> > >>>>> there likely to be additional joins done in a real-world workflow,
> I
> > do
> > >>>>> not
> > >>>>> see how you can conclusively deal with out-of-order arrival of
> > >>>>> foreign-key
> > >>>>> changes and subsequent joins. I have attempted what I think you
> have
> > >>>>> proposed (without a high-water, using groupBy and reduce) and found
> > >>>>> that if
> > >>>>> the foreign key changes too quickly, or the load on a stream thread
> > is
> > >>>>> too
> > >>>>> high, the joined messages will arrive out-of-order and be
> incorrectly
> > >>>>> propagated, such that an intermediate event is represented as the
> > final
> > >>>>> event.
> > >>>>>
> > >>>> Can you shed some light on your groupBy implementation. There must
> be
> > >>>> some sort of flaw in it.
> > >>>> I have a suspicion where it is, I would just like to confirm. The
> idea
> > >>>> is bullet proof and it must be
> > >>>> an implementation mess up. I would like to clarify before we draw a
> > >>>> conclusion.
> > >>>>
> > >>>>    Repartitioning the scattered events back to their original
> > >>>>> partitions is the only way I know how to conclusively deal with
> > >>>>> out-of-order events in a given time frame, and to ensure that the
> > data
> > >>>>> is
> > >>>>> eventually consistent with the input events.
> > >>>>>
> > >>>>> If you have some code to share that illustrates your approach, I
> > would
> > >>>>> be
> > >>>>> very grateful as it would remove any misunderstandings that I may
> > have.
> > >>>>>
> > >>>> ah okay you were looking for my code. I don't have something easily
> > >>>> readable here as its bloated with OO-patterns.
> > >>>>
> > >>>> its anyhow trivial:
> > >>>>
> > >>>> @Override
> > >>>>      public T apply(K aggKey, V value, T aggregate)
> > >>>>      {
> > >>>>          Map<U, V> currentStateAsMap = asMap(aggregate); <<
> imaginary
> > >>>>          U toModifyKey = mapper.apply(value);
> > >>>>              << this is the place where people actually gonna have
> > issues
> > >>>> and why you probably couldn't do it. we would need to find a
> solution
> > here.
> > >>>> I didn't realize that yet.
> > >>>>              << we propagate the field in the joiner, so that we can
> > pick
> > >>>> it up in an aggregate. Probably you have not thought of this in your
> > >>>> approach right?
> > >>>>              << I am very open to find a generic solution here. In
> my
> > >>>> honest opinion this is broken in KTableImpl.GroupBy that it looses
> > the keys
> > >>>> and only maintains the aggregate key.
> > >>>>              << I abstracted it away back then way before i was
> > thinking
> > >>>> of oneToMany join. That is why I didn't realize its significance
> here.
> > >>>>              << Opinions?
> > >>>>
> > >>>>          for (V m : current)
> > >>>>          {
> > >>>>              currentStateAsMap.put(mapper.apply(m), m);
> > >>>>          }
> > >>>>          if (isAdder)
> > >>>>          {
> > >>>>              currentStateAsMap.put(toModifyKey, value);
> > >>>>          }
> > >>>>          else
> > >>>>          {
> > >>>>              currentStateAsMap.remove(toModifyKey);
> > >>>>              if(currentStateAsMap.isEmpty()){
> > >>>>                  return null;
> > >>>>              }
> > >>>>          }
> > >>>>          retrun asAggregateType(currentStateAsMap)
> > >>>>      }
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> Thanks,
> > >>>>> Adam
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
> > jan.filip...@trivago.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> Thanks Adam for bringing Matthias to speed!
> > >>>>>> about the differences. I think re-keying back should be optional
> at
> > >>>>>> best.
> > >>>>>> I would say we return a KScatteredTable with reshuffle() returning
> > >>>>>> KTable<originalKey,Joined> to make the backwards repartitioning
> > >>>>>> optional.
> > >>>>>> I am also in a big favour of doing the out of order processing
> using
> > >>>>>> group
> > >>>>>> by instead high water mark tracking.
> > >>>>>> Just because unbounded growth is just scary + It saves us the
> header
> > >>>>>> stuff.
> > >>>>>>
> > >>>>>> I think the abstraction of always repartitioning back is just not
> so
> > >>>>>> strong. Like the work has been done before we partition back and
> > >>>>>> grouping
> > >>>>>> by something else afterwards is really common.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
> > >>>>>>
> > >>>>>> Hi Matthias
> > >>>>>>> Thank you for your feedback, I do appreciate it!
> > >>>>>>>
> > >>>>>>> While name spacing would be possible, it would require to
> > deserialize
> > >>>>>>>
> > >>>>>>>> user headers what implies a runtime overhead. I would suggest to
> > no
> > >>>>>>>> namespace for now to avoid the overhead. If this becomes a
> > problem in
> > >>>>>>>> the future, we can still add name spacing later on.
> > >>>>>>>>
> > >>>>>>>> Agreed. I will go with using a reserved string and document it.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> My main concern about the design it the type of the result
> KTable:
> > If
> > >>>>>>> I
> > >>>>>>> understood the proposal correctly,
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> In your example, you have table1 and table2 swapped. Here is how
> it
> > >>>>>>> works
> > >>>>>>> currently:
> > >>>>>>>
> > >>>>>>> 1) table1 has the records that contain the foreign key within
> their
> > >>>>>>> value.
> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>,
> > >>>>>>> <c,(fk=B,bar=3)>
> > >>>>>>> table2 input stream: <A,X>, <B,Y>
> > >>>>>>>
> > >>>>>>> 2) A Value mapper is required to extract the foreign key.
> > >>>>>>> table1 foreign key mapper: ( value => value.fk )
> > >>>>>>>
> > >>>>>>> The mapper is applied to each element in table1, and a new
> combined
> > >>>>>>> key is
> > >>>>>>> made:
> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
> > >>>>>>> (fk=B,bar=3)>
> > >>>>>>>
> > >>>>>>> 3) The rekeyed events are copartitioned with table2:
> > >>>>>>>
> > >>>>>>> a) Stream Thread with Partition 0:
> > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
> > >>>>>>> Table2: <A,X>
> > >>>>>>>
> > >>>>>>> b) Stream Thread with Partition 1:
> > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
> > >>>>>>> Table2: <B,Y>
> > >>>>>>>
> > >>>>>>> 4) From here, they can be joined together locally by applying the
> > >>>>>>> joiner
> > >>>>>>> function.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> At this point, Jan's design and my design deviate. My design goes
> > on
> > >>>>>>> to
> > >>>>>>> repartition the data post-join and resolve out-of-order arrival
> of
> > >>>>>>> records,
> > >>>>>>> finally returning the data keyed just the original key. I do not
> > >>>>>>> expose
> > >>>>>>> the
> > >>>>>>> CombinedKey or any of the internals outside of the
> joinOnForeignKey
> > >>>>>>> function. This does make for larger footprint, but it removes all
> > >>>>>>> agency
> > >>>>>>> for resolving out-of-order arrivals and handling CombinedKeys
> from
> > the
> > >>>>>>> user. I believe that this makes the function much easier to use.
> > >>>>>>>
> > >>>>>>> Let me know if this helps resolve your questions, and please feel
> > >>>>>>> free to
> > >>>>>>> add anything else on your mind.
> > >>>>>>>
> > >>>>>>> Thanks again,
> > >>>>>>> Adam
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
> > >>>>>>> matth...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>>> I am just catching up on this thread. I did not read everything
> so
> > >>>>>>>> far,
> > >>>>>>>> but want to share couple of initial thoughts:
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Headers: I think there is a fundamental difference between
> header
> > >>>>>>>> usage
> > >>>>>>>> in this KIP and KP-258. For 258, we add headers to changelog
> topic
> > >>>>>>>> that
> > >>>>>>>> are owned by Kafka Streams and nobody else is supposed to write
> > into
> > >>>>>>>> them. In fact, no user header are written into the changelog
> topic
> > >>>>>>>> and
> > >>>>>>>> thus, there are not conflicts.
> > >>>>>>>>
> > >>>>>>>> Nevertheless, I don't see a big issue with using headers within
> > >>>>>>>> Streams.
> > >>>>>>>> As long as we document it, we can have some "reserved" header
> keys
> > >>>>>>>> and
> > >>>>>>>> users are not allowed to use when processing data with Kafka
> > Streams.
> > >>>>>>>> IMHO, this should be ok.
> > >>>>>>>>
> > >>>>>>>> I think there is a safe way to avoid conflicts, since these
> > headers
> > >>>>>>>> are
> > >>>>>>>>
> > >>>>>>>>> only needed in internal topics (I think):
> > >>>>>>>>> For internal and changelog topics, we can namespace all
> headers:
> > >>>>>>>>> * user-defined headers are namespaced as "external." +
> headerKey
> > >>>>>>>>> * internal headers are namespaced as "internal." + headerKey
> > >>>>>>>>>
> > >>>>>>>>> While name spacing would be possible, it would require to
> > >>>>>>>> deserialize
> > >>>>>>>> user headers what implies a runtime overhead. I would suggest to
> > no
> > >>>>>>>> namespace for now to avoid the overhead. If this becomes a
> > problem in
> > >>>>>>>> the future, we can still add name spacing later on.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> My main concern about the design it the type of the result
> KTable:
> > >>>>>>>> If I
> > >>>>>>>> understood the proposal correctly,
> > >>>>>>>>
> > >>>>>>>> KTable<K1,V1> table1 = ...
> > >>>>>>>> KTable<K2,V2> table2 = ...
> > >>>>>>>>
> > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
> > >>>>>>>>
> > >>>>>>>> implies that the `joinedTable` has the same key as the left
> input
> > >>>>>>>> table.
> > >>>>>>>> IMHO, this does not work because if table2 contains multiple
> rows
> > >>>>>>>> that
> > >>>>>>>> join with a record in table1 (what is the main purpose of a
> > foreign
> > >>>>>>>> key
> > >>>>>>>> join), the result table would only contain a single join result,
> > but
> > >>>>>>>> not
> > >>>>>>>> multiple.
> > >>>>>>>>
> > >>>>>>>> Example:
> > >>>>>>>>
> > >>>>>>>> table1 input stream: <A,X>
> > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
> > >>>>>>>>
> > >>>>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins).
> > If
> > >>>>>>>> the
> > >>>>>>>> result key is the same key as key of table1, this implies that
> the
> > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not
> > both.
> > >>>>>>>> Because the share the same key, whatever result record we emit
> > later,
> > >>>>>>>> overwrite the previous result.
> > >>>>>>>>
> > >>>>>>>> This is the reason why Jan originally proposed to use a
> > combination
> > >>>>>>>> of
> > >>>>>>>> both primary keys of the input tables as key of the output
> table.
> > >>>>>>>> This
> > >>>>>>>> makes the keys of the output table unique and we can store both
> in
> > >>>>>>>> the
> > >>>>>>>> output table:
> > >>>>>>>>
> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Thoughts?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> > >>>>>>>>
> > >>>>>>>> Just on remark here.
> > >>>>>>>>> The high-watermark could be disregarded. The decision about the
> > >>>>>>>>> forward
> > >>>>>>>>> depends on the size of the aggregated map.
> > >>>>>>>>> Only 1 element long maps would be unpacked and forwarded. 0
> > element
> > >>>>>>>>> maps
> > >>>>>>>>> would be published as delete. Any other count
> > >>>>>>>>> of map entries is in "waiting for correct deletes to
> > arrive"-state.
> > >>>>>>>>>
> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
> > >>>>>>>>>
> > >>>>>>>>> It does look like I could replace the second repartition store
> > and
> > >>>>>>>>>> highwater store with a groupBy and reduce.  However, it looks
> > like
> > >>>>>>>>>> I
> > >>>>>>>>>> would
> > >>>>>>>>>> still need to store the highwater value within the
> materialized
> > >>>>>>>>>> store,
> > >>>>>>>>>>
> > >>>>>>>>>> to
> > >>>>>>>>> compare the arrival of out-of-order records (assuming my
> > >>>>>>>>> understanding
> > >>>>>>>>> of
> > >>>>>>>>> THIS is correct...). This in effect is the same as the design I
> > have
> > >>>>>>>>> now,
> > >>>>>>>>> just with the two tables merged together.
> > >>>>>>>>>
> >
> >
>

Reply via email to