Hi Jan and Adam,

Wow, thanks for doing that test, Adam. Those results are encouraging.

Thanks for your performance experience as well, Jan. I agree that avoiding
unnecessary join outputs is especially important when the fan-out is so
high. I suppose this could also be built into the implementation we're
discussing, but it wouldn't have to be specified in the KIP (since it's an
API-transparent optimization).

As far as whether or not to re-repartition the data, I didn't bring it up
because it sounded like the two of you agreed to leave the KIP as-is,
despite the disagreement.

If you want my opinion, I feel like both approaches are reasonable.
It sounds like Jan values more the potential for developers to optimize
their topologies to re-use the intermediate nodes, whereas Adam places more
value on having a single operator that people can use without extra steps
at the end.

Personally, although I do find it exceptionally annoying when a framework
gets in my way when I'm trying to optimize something, it seems better to go
for a single operation.
* Encapsulating the internal transitions gives us significant latitude in
the implementation (for example, joining only at the end, not in the middle
to avoid extra data copying and out-of-order resolution; how we represent
the first repartition keys (combined keys vs. value vectors), etc.). If we
publish something like a KScatteredTable with the right-partitioned joined
data, then the API pretty much locks in the implementation as well.
* The API seems simpler to understand and use. I do mean "seems"; if anyone
wants to make the case that KScatteredTable is actually simpler, I think
hypothetical usage code would help. From a relational algebra perspective,
it seems like KTable.join(KTable) should produce a new KTable in all cases.
* That said, there might still be room in the API for a different operation
like what Jan has proposed to scatter a KTable, and then do things like
join, re-group, etc from there... I'm not sure; I haven't thought through
all the consequences yet.

This is all just my opinion after thinking over the discussion so far...
-John

On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Updated the PR to take into account John's feedback.
>
> I did some preliminary testing for the performance of the prefixScan. I
> have attached the file, but I will also include the text in the body here
> for archival purposes (I am not sure what happens to attached files). I
> also updated the PR and the KIP accordingly.
>
> Summary: It scales exceptionally well for scanning large values of
> records. As Jan mentioned previously, the real issue would be more around
> processing the resulting records after obtaining them. For instance, it
> takes approximately ~80-120 mS to flush the buffer and a further ~35-85mS
> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating
> through the records just to generate a simple count takes ~ 40 times longer
> than the flush + scan combined.
>
> ============================================================================================
> Setup:
>
> ============================================================================================
> Java 9 with default settings aside from a 512 MB heap (Xmx512m, Xms512m)
> CPU: i7 2.2 Ghz.
>
> Note: I am using a slightly-modified, directly-accessible Kafka Streams
> RocksDB
> implementation (RocksDB.java, basically just avoiding the
> ProcessorContext).
> There are no modifications to the default RocksDB values provided in the
> 2.1/trunk release.
>
>
> keysize = 128 bytes
> valsize = 512 bytes
>
> Step 1:
> Write X positive matching events: (key = prefix + left-padded
> auto-incrementing integer)
> Step 2:
> Write 10X negative matching events (key = left-padded auto-incrementing
> integer)
> Step 3:
> Perform flush
> Step 4:
> Perform prefixScan
> Step 5:
> Iterate through return Iterator and validate the count of expected events.
>
>
> ============================================================================================
> Results:
>
> ============================================================================================
> X = 1k (11k events total)
> Flush Time = 39 mS
> Scan Time = 7 mS
> 6.9 MB disk
>
> --------------------------------------------------------------------------------------------
> X = 10k (110k events total)
> Flush Time = 45 mS
> Scan Time = 8 mS
> 127 MB
>
> --------------------------------------------------------------------------------------------
> X = 100k (1.1M events total)
> Test1:
> Flush Time = 60 mS
> Scan Time = 12 mS
> 678 MB
>
> Test2:
> Flush Time = 45 mS
> Scan Time = 7 mS
> 576 MB
>
> --------------------------------------------------------------------------------------------
> X = 1MB (11M events total)
> Test1:
> Flush Time = 52 mS
> Scan Time = 19 mS
> 7.2 GB
>
> Test2:
> Flush Time = 84 mS
> Scan Time = 34 mS
> 9.1 GB
>
> --------------------------------------------------------------------------------------------
> X = 2.5M (27.5M events total)
> Test1:
> Flush Time = 82 mS
> Scan Time = 63 mS
> 17GB - 276 sst files
>
> Test2:
> Flush Time = 116 mS
> Scan Time = 35 mS
> 23GB - 361 sst files
>
> Test3:
> Flush Time = 103 mS
> Scan Time = 82 mS
> 19 GB - 300 sst files
>
> --------------------------------------------------------------------------------------------
>
> I had to limit my testing on my laptop to X = 2.5M events. I tried to go
> to X = 10M (110M events) but RocksDB was going into the 100GB+ range and my
> laptop ran out of disk. More extensive testing could be done but I suspect
> that it would be in line with what we're seeing in the results above.
>
>
>
>
>
>
> At this point in time, I think the only major discussion point is really
> around what Jan and I have disagreed on: repartitioning back + resolving
> potential out of order issues or leaving that up to the client to handle.
>
>
> Thanks folks,
>
> Adam
>
>
> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>>
>>
>> On 29.11.2018 15:14, John Roesler wrote:
>> > Hi all,
>> >
>> > Sorry that this discussion petered out... I think the 2.1 release
>> caused an
>> > extended distraction that pushed it off everyone's radar (which was
>> > precisely Adam's concern). Personally, I've also had some extend
>> > distractions of my own that kept (and continue to keep) me preoccupied.
>> >
>> > However, calling for a vote did wake me up, so I guess Jan was on the
>> right
>> > track!
>> >
>> > I've gone back and reviewed the whole KIP document and the prior
>> > discussion, and I'd like to offer a few thoughts:
>> >
>> > API Thoughts:
>> >
>> > 1. If I read the KIP right, you are proposing a many-to-one join. Could
>> we
>> > consider naming it manyToOneJoin? Or, if you prefer, flip the design
>> around
>> > and make it a oneToManyJoin?
>> >
>> > The proposed name "joinOnForeignKey" disguises the join type, and it
>> seems
>> > like it might trick some people into using it for a one-to-one join.
>> This
>> > would work, of course, but it would be super inefficient compared to a
>> > simple rekey-and-join.
>> >
>> > 2. I might have missed it, but I don't think it's specified whether
>> it's an
>> > inner, outer, or left join. I'm guessing an outer join, as (neglecting
>> IQ),
>> > the rest can be achieved by filtering or by handling it in the
>> ValueJoiner.
>> >
>> > 3. The arg list to joinOnForeignKey doesn't look quite right.
>> > 3a. Regarding Serialized: There are a few different paradigms in play in
>> > the Streams API, so it's confusing, but instead of three Serialized
>> args, I
>> > think it would be better to have one that allows (optionally) setting
>> the 4
>> > incoming serdes. The result serde is defined by the Materialized. The
>> > incoming serdes can be optional because they might already be available
>> on
>> > the source KTables, or the default serdes from the config might be
>> > applicable.
>> >
>> > 3b. Is the StreamPartitioner necessary? The other joins don't allow
>> setting
>> > one, and it seems like it might actually be harmful, since the rekey
>> > operation needs to produce results that are co-partitioned with the
>> "other"
>> > KTable.
>> >
>> > 4. I'm fine with the "reserved word" header, but I didn't actually
>> follow
>> > what Matthias meant about namespacing requiring "deserializing" the
>> record
>> > header. The headers are already Strings, so I don't think that
>> > deserialization is required. If we applied the namespace at source nodes
>> > and stripped it at sink nodes, this would be practically no overhead.
>> The
>> > advantage of the namespace idea is that no public API change wrt headers
>> > needs to happen, and no restrictions need to be placed on users'
>> headers.
>> >
>> > (Although I'm wondering if we can get away without the header at all...
>> > stay tuned)
>> >
>> > 5. I also didn't follow the discussion about the HWM table growing
>> without
>> > bound. As I read it, the HWM table is effectively implementing OCC to
>> > resolve the problem you noted with disordering when the rekey is
>> > reversed... particularly notable when the FK changes. As such, it only
>> > needs to track the most recent "version" (the offset in the source
>> > partition) of each key. Therefore, it should have the same number of
>> keys
>> > as the source table at all times.
>> >
>> > I see that you are aware of KIP-258, which I think might be relevant in
>> a
>> > couple of ways. One: it's just about storing the timestamp in the state
>> > store, but the ultimate idea is to effectively use the timestamp as an
>> OCC
>> > "version" to drop disordered updates. You wouldn't want to use the
>> > timestamp for this operation, but if you were to use a similar
>> mechanism to
>> > store the source offset in the store alongside the re-keyed values, then
>> > you could avoid a separate table.
>> >
>> > 6. You and Jan have been thinking about this for a long time, so I've
>> > probably missed something here, but I'm wondering if we can avoid the
>> HWM
>> > tracking at all and resolve out-of-order during a final join instead...
>> >
>> > Let's say we're joining a left table (Integer K: Letter FK, (other
>> data))
>> > to a right table (Letter K: (some data)).
>> >
>> > Left table:
>> > 1: (A, xyz)
>> > 2: (B, asd)
>> >
>> > Right table:
>> > A: EntityA
>> > B: EntityB
>> >
>> > We could do a rekey as you proposed with a combined key, but not
>> > propagating the value at all..
>> > Rekey table:
>> > A-1: (dummy value)
>> > B-2: (dummy value)
>> >
>> > Which we then join with the right table to produce:
>> > A-1: EntityA
>> > B-2: EntityB
>> >
>> > Which gets rekeyed back:
>> > 1: A, EntityA
>> > 2: B, EntityB
>> >
>> > And finally we do the actual join:
>> > Result table:
>> > 1: ((A, xyz), EntityA)
>> > 2: ((B, asd), EntityB)
>> >
>> > The thing is that in that last join, we have the opportunity to compare
>> the
>> > current FK in the left table with the incoming PK of the right table. If
>> > they don't match, we just drop the event, since it must be outdated.
>> >
>>
>> > In your KIP, you gave an example in which (1: A, xyz) gets updated to
>> (1:
>> > B, xyz), ultimately yielding a conundrum about whether the final state
>> > should be (1: null) or (1: joined-on-B). With the algorithm above, you
>> > would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B,
>> > EntityB)). It seems like this does give you enough information to make
>> the
>> > right choice, regardless of disordering.
>>
>> Will check Adams patch, but this should work. As mentioned often I am
>> not convinced on partitioning back for the user automatically. I think
>> this is the real performance eater ;)
>>
>> >
>> >
>> > 7. Last thought... I'm a little concerned about the performance of the
>> > range scans when records change in the right table. You've said that
>> you've
>> > been using the algorithm you presented in production for a while. Can
>> you
>> > give us a sense of the performance characteristics you've observed?
>> >
>>
>> Make it work, make it fast, make it beautiful. The topmost thing here is
>> / was correctness. In practice I do not measure the performance of the
>> range scan. Usual cases I run this with is emitting 500k - 1kk rows
>> on a left hand side change. The range scan is just the work you gotta
>> do, also when you pack your data into different formats, usually the
>> rocks performance is very tight to the size of the data and we can't
>> really change that. It is more important for users to prevent useless
>> updates to begin with. My left hand side is guarded to drop changes that
>> are not going to change my join output.
>>
>> usually it's:
>>
>> drop unused fields and then don't forward if old.equals(new)
>>
>> regarding to the performance of creating an iterator for smaller
>> fanouts, users can still just do a group by first then anyways.
>>
>>
>>
>> > I could only think of one alternative, but I'm not sure if it's better
>> or
>> > worse... If the first re-key only needs to preserve the original key,
>> as I
>> > proposed in #6, then we could store a vector of keys in the value:
>> >
>> > Left table:
>> > 1: A,...
>> > 2: B,...
>> > 3: A,...
>> >
>> > Gets re-keyed:
>> > A: [1, 3]
>> > B: [2]
>> >
>> > Then, the rhs part of the join would only need a regular single-key
>> lookup.
>> > Of course we have to deal with the problem of large values, as there's
>> no
>> > bound on the number of lhs records that can reference rhs records.
>> Offhand,
>> > I'd say we could page the values, so when one row is past the
>> threshold, we
>> > append the key for the next page. Then in most cases, it would be a
>> single
>> > key lookup, but for large fan-out updates, it would be one per (max
>> value
>> > size)/(avg lhs key size).
>> >
>> > This seems more complex, though... Plus, I think there's some extra
>> > tracking we'd need to do to know when to emit a retraction. For example,
>> > when record 1 is deleted, the re-key table would just have (A: [3]).
>> Some
>> > kind of tombstone is needed so that the join result for 1 can also be
>> > retracted.
>> >
>> > That's all!
>> >
>> > Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry the
>> > discussion has been slow.
>> > -John
>> >
>> >
>> > On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <jan.filip...@trivago.com>
>> > wrote:
>> >
>> >> Id say you can just call the vote.
>> >>
>> >> that happens all the time, and if something comes up, it just goes back
>> >> to discuss.
>> >>
>> >> would not expect to much attention with another another email in this
>> >> thread.
>> >>
>> >> best Jan
>> >>
>> >> On 09.10.2018 13:56, Adam Bellemare wrote:
>> >>> Hello Contributors
>> >>>
>> >>> I know that 2.1 is about to be released, but I do need to bump this to
>> >> keep
>> >>> visibility up. I am still intending to push this through once
>> contributor
>> >>> feedback is given.
>> >>>
>> >>> Main points that need addressing:
>> >>> 1) Any way (or benefit) in structuring the current singular graph node
>> >> into
>> >>> multiple nodes? It has a whopping 25 parameters right now. I am a bit
>> >> fuzzy
>> >>> on how the optimizations are supposed to work, so I would appreciate
>> any
>> >>> help on this aspect.
>> >>>
>> >>> 2) Overall strategy for joining + resolving. This thread has much
>> >> discourse
>> >>> between Jan and I between the current highwater mark proposal and a
>> >> groupBy
>> >>> + reduce proposal. I am of the opinion that we need to strictly handle
>> >> any
>> >>> chance of out-of-order data and leave none of it up to the consumer.
>> Any
>> >>> comments or suggestions here would also help.
>> >>>
>> >>> 3) Anything else that you see that would prevent this from moving to a
>> >> vote?
>> >>>
>> >>> Thanks
>> >>>
>> >>> Adam
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>> >> adam.bellem...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Hi Jan
>> >>>>
>> >>>> With the Stores.windowStoreBuilder and Stores.persistentWindowStore,
>> you
>> >>>> actually only need to specify the amount of segments you want and how
>> >> large
>> >>>> they are. To the best of my understanding, what happens is that the
>> >>>> segments are automatically rolled over as new data with new
>> timestamps
>> >> are
>> >>>> created. We use this exact functionality in some of the work done
>> >>>> internally at my company. For reference, this is the hopping windowed
>> >> store.
>> >>>>
>> >>>>
>> >>
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>> >>>>
>> >>>> In the code that I have provided, there are going to be two 24h
>> >> segments.
>> >>>> When a record is put into the windowStore, it will be inserted at
>> time
>> >> T in
>> >>>> both segments. The two segments will always overlap by 12h. As time
>> >> goes on
>> >>>> and new records are added (say at time T+12h+), the oldest segment
>> will
>> >> be
>> >>>> automatically deleted and a new segment created. The records are by
>> >> default
>> >>>> inserted with the context.timestamp(), such that it is the record
>> time,
>> >> not
>> >>>> the clock time, which is used.
>> >>>>
>> >>>> To the best of my understanding, the timestamps are retained when
>> >>>> restoring from the changelog.
>> >>>>
>> >>>> Basically, this is heavy-handed way to deal with TTL at a
>> segment-level,
>> >>>> instead of at an individual record level.
>> >>>>
>> >>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>> jan.filip...@trivago.com>
>> >>>> wrote:
>> >>>>
>> >>>>> Will that work? I expected it to blow up with ClassCastException or
>> >>>>> similar.
>> >>>>>
>> >>>>> You either would have to specify the window you fetch/put or iterate
>> >>>>> across all windows the key was found in right?
>> >>>>>
>> >>>>> I just hope the window-store doesn't check stream-time under the
>> hoods
>> >>>>> that would be a questionable interface.
>> >>>>>
>> >>>>> If it does: did you see my comment on checking all the windows
>> earlier?
>> >>>>> that would be needed to actually give reasonable time gurantees.
>> >>>>>
>> >>>>> Best
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>> >>>>>> Hi Jan
>> >>>>>>
>> >>>>>> Check for  " highwaterMat " in the PR. I only changed the state
>> store,
>> >>>>> not
>> >>>>>> the ProcessorSupplier.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Adam
>> >>>>>>
>> >>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>> >> jan.filip...@trivago.com
>> >>>>>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>> >>>>>>>
>> >>>>>>>> @Guozhang
>> >>>>>>>>
>> >>>>>>>> Thanks for the information. This is indeed something that will be
>> >>>>>>>> extremely
>> >>>>>>>> useful for this KIP.
>> >>>>>>>>
>> >>>>>>>> @Jan
>> >>>>>>>> Thanks for your explanations. That being said, I will not be
>> moving
>> >>>>> ahead
>> >>>>>>>> with an implementation using reshuffle/groupBy solution as you
>> >>>>> propose.
>> >>>>>>>> That being said, if you wish to implement it yourself off of my
>> >>>>> current PR
>> >>>>>>>> and submit it as a competitive alternative, I would be more than
>> >>>>> happy to
>> >>>>>>>> help vet that as an alternate solution. As it stands right now,
>> I do
>> >>>>> not
>> >>>>>>>> really have more time to invest into alternatives without there
>> >> being
>> >>>>> a
>> >>>>>>>> strong indication from the binding voters which they would
>> prefer.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>> Hey, total no worries. I think I personally gave up on the streams
>> >> DSL
>> >>>>> for
>> >>>>>>> some time already, otherwise I would have pulled this KIP through
>> >>>>> already.
>> >>>>>>> I am currently reimplementing my own DSL based on PAPI.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>> I will look at finishing up my PR with the windowed state store
>> in
>> >> the
>> >>>>>>>> next
>> >>>>>>>> week or so, exercising it via tests, and then I will come back
>> for
>> >>>>> final
>> >>>>>>>> discussions. In the meantime, I hope that any of the binding
>> voters
>> >>>>> could
>> >>>>>>>> take a look at the KIP in the wiki. I have updated it according
>> to
>> >> the
>> >>>>>>>> latest plan:
>> >>>>>>>>
>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> >>>>>>>> Support+non-key+joining+in+KTable
>> >>>>>>>>
>> >>>>>>>> I have also updated the KIP PR to use a windowed store. This
>> could
>> >> be
>> >>>>>>>> replaced by the results of KIP-258 whenever they are completed.
>> >>>>>>>> https://github.com/apache/kafka/pull/5527
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>>
>> >>>>>>>> Adam
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated in
>> the
>> >>>>> PR?
>> >>>>>>> expected it to change to Windowed<K>,Long Missing something?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
>> wangg...@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong link,
>> as it
>> >>>>> is
>> >>>>>>>>> for
>> >>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258 we do
>> >>>>> want to
>> >>>>>>>>> have "handling out-of-order data for source KTable" such that
>> >>>>> instead of
>> >>>>>>>>> blindly apply the updates to the materialized store, i.e.
>> following
>> >>>>>>>>> offset
>> >>>>>>>>> ordering, we will reject updates that are older than the current
>> >>>>> key's
>> >>>>>>>>> timestamps, i.e. following timestamp ordering.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Guozhang
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
>> >> wangg...@gmail.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hello Adam,
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e. the
>> >> high
>> >>>>>>>>>> watermark store, now altered to be replaced with a window
>> store),
>> >> I
>> >>>>>>>>>> think
>> >>>>>>>>>> another current on-going KIP may actually help:
>> >>>>>>>>>>
>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> This is for adding the timestamp into a key-value store (i.e.
>> only
>> >>>>> for
>> >>>>>>>>>> non-windowed KTable), and then one of its usage, as described
>> in
>> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that we
>> can
>> >>>>> then
>> >>>>>>>>>> "reject" updates from the source topics if its timestamp is
>> >> smaller
>> >>>>> than
>> >>>>>>>>>> the current key's latest update timestamp. I think it is very
>> >>>>> similar to
>> >>>>>>>>>> what you have in mind for high watermark based filtering, while
>> >> you
>> >>>>> only
>> >>>>>>>>>> need to make sure that the timestamps of the joining records
>> are
>> >>>>>>>>>>
>> >>>>>>>>> correctly
>> >>>>>>>>>
>> >>>>>>>>>> inherited though the whole topology to the final stage.
>> >>>>>>>>>>
>> >>>>>>>>>> Note that this KIP is for key-value store and hence
>> non-windowed
>> >>>>> KTables
>> >>>>>>>>>> only, but for windowed KTables we do not really have a good
>> >> support
>> >>>>> for
>> >>>>>>>>>> their joins anyways (
>> >>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
>> >>>>>>>>>> I
>> >>>>>>>>>> think we can just consider non-windowed KTable-KTable non-key
>> >> joins
>> >>>>> for
>> >>>>>>>>>> now. In which case, KIP-258 should help.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Guozhang
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
>> >>>>> jan.filip...@trivago.com
>> >>>>>>>>>>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hi Guozhang
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Current highwater mark implementation would grow endlessly
>> based
>> >>>>> on
>> >>>>>>>>>>>> primary key of original event. It is a pair of (<this table
>> >>>>> primary
>> >>>>>>>>>>>>
>> >>>>>>>>>>> key>,
>> >>>>>>>>>
>> >>>>>>>>>> <highest offset seen for that key>). This is used to
>> differentiate
>> >>>>>>>>>>>>
>> >>>>>>>>>>> between
>> >>>>>>>>>
>> >>>>>>>>>> late arrivals and new updates. My newest proposal would be to
>> >>>>> replace
>> >>>>>>>>>>>>
>> >>>>>>>>>>> it
>> >>>>>>>>>
>> >>>>>>>>>> with a Windowed state store of Duration N. This would allow the
>> >> same
>> >>>>>>>>>>>> behaviour, but cap the size based on time. This should allow
>> for
>> >>>>> all
>> >>>>>>>>>>>> late-arriving events to be processed, and should be
>> customizable
>> >>>>> by
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10
>> minutes
>> >> of
>> >>>>>>>>>>>>
>> >>>>>>>>>>> window,
>> >>>>>>>>>
>> >>>>>>>>>> or perhaps 7 days...).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Hi Adam, using time based retention can do the trick here.
>> Even
>> >>>>> if I
>> >>>>>>>>>>> would still like to see the automatic repartitioning optional
>> >>>>> since I
>> >>>>>>>>>>>
>> >>>>>>>>>> would
>> >>>>>>>>>
>> >>>>>>>>>> just reshuffle again. With windowed store I am a little bit
>> >>>>> sceptical
>> >>>>>>>>>>>
>> >>>>>>>>>> about
>> >>>>>>>>>
>> >>>>>>>>>> how to determine the window. So esentially one could run into
>> >>>>> problems
>> >>>>>>>>>>>
>> >>>>>>>>>> when
>> >>>>>>>>>
>> >>>>>>>>>> the rapid change happens near a window border. I will check you
>> >>>>>>>>>>> implementation in detail, if its problematic, we could still
>> >> check
>> >>>>>>>>>>> _all_
>> >>>>>>>>>>> windows on read with not to bad performance impact I guess.
>> Will
>> >>>>> let
>> >>>>>>>>>>> you
>> >>>>>>>>>>> know if the implementation would be correct as is. I wouldn't
>> not
>> >>>>> like
>> >>>>>>>>>>>
>> >>>>>>>>>> to
>> >>>>>>>>>
>> >>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A)  <
>> >> timestamp(B).
>> >>>>> I
>> >>>>>>>>>>>
>> >>>>>>>>>> think
>> >>>>>>>>>
>> >>>>>>>>>> we can't expect that.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> @Jan
>> >>>>>>>>>>>> I believe I understand what you mean now - thanks for the
>> >>>>> diagram, it
>> >>>>>>>>>>>> did really help. You are correct that I do not have the
>> original
>> >>>>>>>>>>>>
>> >>>>>>>>>>> primary
>> >>>>>>>>>
>> >>>>>>>>>> key available, and I can see that if it was available then you
>> >>>>> would be
>> >>>>>>>>>>>> able to add and remove events from the Map. That being said,
>> I
>> >>>>>>>>>>>>
>> >>>>>>>>>>> encourage
>> >>>>>>>>>
>> >>>>>>>>>> you to finish your diagrams / charts just for clarity for
>> everyone
>> >>>>>>>>>>>>
>> >>>>>>>>>>> else.
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But I
>> >>>>> understand
>> >>>>>>>>>>>>
>> >>>>>>>>>>> the benefits for the rest. Sorry about the original primary
>> key,
>> >> We
>> >>>>>>>>>>> have
>> >>>>>>>>>>> join and Group by implemented our own in PAPI and basically
>> not
>> >>>>> using
>> >>>>>>>>>>>
>> >>>>>>>>>> any
>> >>>>>>>>>
>> >>>>>>>>>> DSL (Just the abstraction). Completely missed that in original
>> DSL
>> >>>>> its
>> >>>>>>>>>>>
>> >>>>>>>>>> not
>> >>>>>>>>>
>> >>>>>>>>>> there and just assumed it. total brain mess up on my end. Will
>> >>>>> finish
>> >>>>>>>>>>>
>> >>>>>>>>>> the
>> >>>>>>>>>
>> >>>>>>>>>> chart as soon as i get a quite evening this week.
>> >>>>>>>>>>>
>> >>>>>>>>>>> My follow up question for you is, won't the Map stay inside
>> the
>> >>>>> State
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Store indefinitely after all of the changes have propagated?
>> >> Isn't
>> >>>>>>>>>>>> this
>> >>>>>>>>>>>> effectively the same as a highwater mark state store?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna
>> return
>> >>>>> `null`
>> >>>>>>>>>>>
>> >>>>>>>>>> and
>> >>>>>>>>>
>> >>>>>>>>>> the key is removed from the keyspace. But there is going to be
>> a
>> >>>>> store
>> >>>>>>>>>>> 100%, the good thing is that I can use this store directly for
>> >>>>>>>>>>> materialize() / enableSendingOldValues() is a regular store,
>> >>>>> satisfying
>> >>>>>>>>>>> all gurantees needed for further groupby / join. The Windowed
>> >>>>> store is
>> >>>>>>>>>>>
>> >>>>>>>>>> not
>> >>>>>>>>>
>> >>>>>>>>>> keeping the values, so for the next statefull operation we
>> would
>> >>>>>>>>>>> need to instantiate an extra store. or we have the window
>> store
>> >>>>> also
>> >>>>>>>>>>>
>> >>>>>>>>>> have
>> >>>>>>>>>
>> >>>>>>>>>> the values then.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Long story short. if we can flip in a custom group by before
>> >>>>>>>>>>> repartitioning to the original primary key i think it would
>> help
>> >>>>> the
>> >>>>>>>>>>>
>> >>>>>>>>>> users
>> >>>>>>>>>
>> >>>>>>>>>> big time in building efficient apps. Given the original primary
>> >> key
>> >>>>>>>>>>>
>> >>>>>>>>>> issue I
>> >>>>>>>>>
>> >>>>>>>>>> understand that we do not have a solid foundation to build on.
>> >>>>>>>>>>> Leaving primary key carry along to the user. very
>> unfortunate. I
>> >>>>> could
>> >>>>>>>>>>> understand the decision goes like that. I do not think its a
>> good
>> >>>>>>>>>>>
>> >>>>>>>>>> decision.
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks
>> >>>>>>>>>>>> Adam
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
>> >>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
>> dumbreprajakta...@gmail.com
>> >>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>>>         please remove me from this group
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
>> >>>>>>>>>>>>         <jan.filip...@trivago.com <mailto:
>> >> jan.filip...@trivago.com
>> >>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > Hi Adam,
>> >>>>>>>>>>>>         >
>> >>>>>>>>>>>>         > give me some time, will make such a chart. last
>> time i
>> >>>>> didn't
>> >>>>>>>>>>>>         get along
>> >>>>>>>>>>>>         > well with giphy and ruined all your charts.
>> >>>>>>>>>>>>         > Hopefully i can get it done today
>> >>>>>>>>>>>>         >
>> >>>>>>>>>>>>         > On 08.09.2018 16:00, Adam Bellemare wrote:
>> >>>>>>>>>>>>         > > Hi Jan
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > > I have included a diagram of what I attempted on
>> the
>> >>>>> KIP.
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         >
>> >>>>>>>>>>>>
>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>> >>>>>>>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>> >>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>> >>>>>>>>>>>>         <
>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>> >>>>>>>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>> >>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > > I attempted this back at the start of my own
>> >>>>> implementation
>> >>>>>>>>>>>> of
>> >>>>>>>>>>>>         this
>> >>>>>>>>>>>>         > > solution, and since I could not get it to work I
>> have
>> >>>>> since
>> >>>>>>>>>>>>         discarded the
>> >>>>>>>>>>>>         > > code. At this point in time, if you wish to
>> continue
>> >>>>> pursuing
>> >>>>>>>>>>>>         for your
>> >>>>>>>>>>>>         > > groupBy solution, I ask that you please create a
>> >>>>> diagram on
>> >>>>>>>>>>>>         the KIP
>> >>>>>>>>>>>>         > > carefully explaining your solution. Please feel
>> free
>> >> to
>> >>>>> use
>> >>>>>>>>>>>>         the image I
>> >>>>>>>>>>>>         > > just posted as a starting point. I am having
>> trouble
>> >>>>>>>>>>>>         understanding your
>> >>>>>>>>>>>>         > > explanations but I think that a carefully
>> constructed
>> >>>>> diagram
>> >>>>>>>>>>>>         will clear
>> >>>>>>>>>>>>         > up
>> >>>>>>>>>>>>         > > any misunderstandings. Alternately, please post a
>> >>>>>>>>>>>>         comprehensive PR with
>> >>>>>>>>>>>>         > > your solution. I can only guess at what you
>> mean, and
>> >>>>> since I
>> >>>>>>>>>>>>         value my
>> >>>>>>>>>>>>         > own
>> >>>>>>>>>>>>         > > time as much as you value yours, I believe it is
>> your
>> >>>>>>>>>>>>         responsibility to
>> >>>>>>>>>>>>         > > provide an implementation instead of me trying to
>> >> guess.
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > > Adam
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
>> >>>>>>>>>>>>         <jan.filip...@trivago.com <mailto:
>> >> jan.filip...@trivago.com
>> >>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > > wrote:
>> >>>>>>>>>>>>         > >
>> >>>>>>>>>>>>         > >> Hi James,
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >> nice to see you beeing interested. kafka
>> streams at
>> >>>>> this
>> >>>>>>>>>>>>         point supports
>> >>>>>>>>>>>>         > >> all sorts of joins as long as both streams have
>> the
>> >>>>> same
>> >>>>>>>>>>>> key.
>> >>>>>>>>>>>>         > >> Adam is currently implementing a join where a
>> KTable
>> >>>>> and a
>> >>>>>>>>>>>>         KTable can
>> >>>>>>>>>>>>         > have
>> >>>>>>>>>>>>         > >> a one to many relation ship (1:n). We exploit
>> that
>> >>>>> rocksdb
>> >>>>>>>>>>>> is
>> >>>>>>>>>>>>
>> >>>>>>>>>>> a
>> >>>>>>>>>
>> >>>>>>>>>>         > >> datastore that keeps data sorted (At least
>> exposes an
>> >>>>> API to
>> >>>>>>>>>>>>         access the
>> >>>>>>>>>>>>         > >> stored data in a sorted fashion).
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >> I think the technical caveats are well
>> understood
>> >> now
>> >>>>> and we
>> >>>>>>>>>>>>
>> >>>>>>>>>>> are
>> >>>>>>>>>
>> >>>>>>>>>>         > basically
>> >>>>>>>>>>>>         > >> down to philosophy and API Design ( when Adam
>> sees
>> >> my
>> >>>>> newest
>> >>>>>>>>>>>>         message).
>> >>>>>>>>>>>>         > >> I have a lengthy track record of loosing those
>> kinda
>> >>>>>>>>>>>>         arguments within
>> >>>>>>>>>>>>         > the
>> >>>>>>>>>>>>         > >> streams community and I have no clue why. So I
>> >>>>> literally
>> >>>>>>>>>>>>         can't wait for
>> >>>>>>>>>>>>         > you
>> >>>>>>>>>>>>         > >> to churn through this thread and give you
>> opinion on
>> >>>>> how we
>> >>>>>>>>>>>>         should
>> >>>>>>>>>>>>         > design
>> >>>>>>>>>>>>         > >> the return type of the oneToManyJoin and how
>> many
>> >>>>> power we
>> >>>>>>>>>>>>         want to give
>> >>>>>>>>>>>>         > to
>> >>>>>>>>>>>>         > >> the user vs "simplicity" (where simplicity isn't
>> >>>>> really that
>> >>>>>>>>>>>>         as users
>> >>>>>>>>>>>>         > still
>> >>>>>>>>>>>>         > >> need to understand it I argue)
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >> waiting for you to join in on the discussion
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >> Best Jan
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >> On 07.09.2018 15:49, James Kwan wrote:
>> >>>>>>>>>>>>         > >>
>> >>>>>>>>>>>>         > >>> I am new to this group and I found this subject
>> >>>>>>>>>>>>         interesting.  Sounds
>> >>>>>>>>>>>>         > like
>> >>>>>>>>>>>>         > >>> you guys want to implement a join table of two
>> >>>>> streams? Is
>> >>>>>>>>>>>> there
>> >>>>>>>>>>>>         > somewhere
>> >>>>>>>>>>>>         > >>> I can see the original requirement or proposal?
>> >>>>>>>>>>>>         > >>>
>> >>>>>>>>>>>>         > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
>> >>>>>>>>>>>>         <jan.filip...@trivago.com <mailto:
>> >> jan.filip...@trivago.com
>> >>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > >>>> wrote:
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>> I'm currently testing using a Windowed Store
>> to
>> >>>>> store the
>> >>>>>>>>>>>>         highwater
>> >>>>>>>>>>>>         > >>>>> mark.
>> >>>>>>>>>>>>         > >>>>> By all indications this should work fine,
>> with
>> >> the
>> >>>>> caveat
>> >>>>>>>>>>>>         being that
>> >>>>>>>>>>>>         > it
>> >>>>>>>>>>>>         > >>>>> can
>> >>>>>>>>>>>>         > >>>>> only resolve out-of-order arrival for up to
>> the
>> >>>>> size of
>> >>>>>>>>>>>>         the window
>> >>>>>>>>>>>>         > (ie:
>> >>>>>>>>>>>>         > >>>>> 24h, 72h, etc). This would remove the
>> possibility
>> >>>>> of it
>> >>>>>>>>>>>>
>> >>>>>>>>>>> being
>> >>>>>>>>>
>> >>>>>>>>>>         > unbounded
>> >>>>>>>>>>>>         > >>>>> in
>> >>>>>>>>>>>>         > >>>>> size.
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>> With regards to Jan's suggestion, I believe
>> this
>> >> is
>> >>>>> where
>> >>>>>>>>>>>>         we will
>> >>>>>>>>>>>>         > have
>> >>>>>>>>>>>>         > >>>>> to
>> >>>>>>>>>>>>         > >>>>> remain in disagreement. While I do not
>> disagree
>> >>>>> with your
>> >>>>>>>>>>>>         statement
>> >>>>>>>>>>>>         > >>>>> about
>> >>>>>>>>>>>>         > >>>>> there likely to be additional joins done in a
>> >>>>> real-world
>> >>>>>>>>>>>>         workflow, I
>> >>>>>>>>>>>>         > do
>> >>>>>>>>>>>>         > >>>>> not
>> >>>>>>>>>>>>         > >>>>> see how you can conclusively deal with
>> >> out-of-order
>> >>>>>>>>>>>> arrival
>> >>>>>>>>>>>> of
>> >>>>>>>>>>>>         > >>>>> foreign-key
>> >>>>>>>>>>>>         > >>>>> changes and subsequent joins. I have
>> attempted
>> >> what
>> >>>>> I
>> >>>>>>>>>>>>         think you have
>> >>>>>>>>>>>>         > >>>>> proposed (without a high-water, using
>> groupBy and
>> >>>>> reduce)
>> >>>>>>>>>>>>         and found
>> >>>>>>>>>>>>         > >>>>> that if
>> >>>>>>>>>>>>         > >>>>> the foreign key changes too quickly, or the
>> load
>> >> on
>> >>>>> a
>> >>>>>>>>>>>>         stream thread
>> >>>>>>>>>>>>         > is
>> >>>>>>>>>>>>         > >>>>> too
>> >>>>>>>>>>>>         > >>>>> high, the joined messages will arrive
>> >> out-of-order
>> >>>>> and be
>> >>>>>>>>>>>>         incorrectly
>> >>>>>>>>>>>>         > >>>>> propagated, such that an intermediate event
>> is
>> >>>>>>>>>>>> represented
>> >>>>>>>>>>>>         as the
>> >>>>>>>>>>>>         > final
>> >>>>>>>>>>>>         > >>>>> event.
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>> Can you shed some light on your groupBy
>> >>>>> implementation.
>> >>>>>>>>>>>>         There must be
>> >>>>>>>>>>>>         > >>>> some sort of flaw in it.
>> >>>>>>>>>>>>         > >>>> I have a suspicion where it is, I would just
>> like
>> >> to
>> >>>>>>>>>>>>         confirm. The idea
>> >>>>>>>>>>>>         > >>>> is bullet proof and it must be
>> >>>>>>>>>>>>         > >>>> an implementation mess up. I would like to
>> clarify
>> >>>>> before
>> >>>>>>>>>>>>         we draw a
>> >>>>>>>>>>>>         > >>>> conclusion.
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>    Repartitioning the scattered events back to
>> >> their
>> >>>>>>>>>>>>
>> >>>>>>>>>>> original
>> >>>>>>>>>
>> >>>>>>>>>>         > >>>>> partitions is the only way I know how to
>> >> conclusively
>> >>>>> deal
>> >>>>>>>>>>>>         with
>> >>>>>>>>>>>>         > >>>>> out-of-order events in a given time frame,
>> and to
>> >>>>> ensure
>> >>>>>>>>>>>>         that the
>> >>>>>>>>>>>>         > data
>> >>>>>>>>>>>>         > >>>>> is
>> >>>>>>>>>>>>         > >>>>> eventually consistent with the input events.
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>> If you have some code to share that
>> illustrates
>> >> your
>> >>>>>>>>>>>>         approach, I
>> >>>>>>>>>>>>         > would
>> >>>>>>>>>>>>         > >>>>> be
>> >>>>>>>>>>>>         > >>>>> very grateful as it would remove any
>> >>>>> misunderstandings
>> >>>>>>>>>>>>         that I may
>> >>>>>>>>>>>>         > have.
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>> ah okay you were looking for my code. I don't
>> have
>> >>>>>>>>>>>>         something easily
>> >>>>>>>>>>>>         > >>>> readable here as its bloated with OO-patterns.
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>> its anyhow trivial:
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>> @Override
>> >>>>>>>>>>>>         > >>>>      public T apply(K aggKey, V value, T
>> >> aggregate)
>> >>>>>>>>>>>>         > >>>>      {
>> >>>>>>>>>>>>         > >>>>          Map<U, V> currentStateAsMap =
>> >>>>> asMap(aggregate);
>> >>>>>>>>>>>> <<
>> >>>>>>>>>>>>         imaginary
>> >>>>>>>>>>>>         > >>>>          U toModifyKey = mapper.apply(value);
>> >>>>>>>>>>>>         > >>>>              << this is the place where people
>> >>>>> actually
>> >>>>>>>>>>>>         gonna have
>> >>>>>>>>>>>>         > issues
>> >>>>>>>>>>>>         > >>>> and why you probably couldn't do it. we would
>> need
>> >>>>> to find
>> >>>>>>>>>>>>         a solution
>> >>>>>>>>>>>>         > here.
>> >>>>>>>>>>>>         > >>>> I didn't realize that yet.
>> >>>>>>>>>>>>         > >>>>              << we propagate the field in the
>> >>>>> joiner, so
>> >>>>>>>>>>>>         that we can
>> >>>>>>>>>>>>         > pick
>> >>>>>>>>>>>>         > >>>> it up in an aggregate. Probably you have not
>> >> thought
>> >>>>> of
>> >>>>>>>>>>>>         this in your
>> >>>>>>>>>>>>         > >>>> approach right?
>> >>>>>>>>>>>>         > >>>>              << I am very open to find a
>> generic
>> >>>>> solution
>> >>>>>>>>>>>>         here. In my
>> >>>>>>>>>>>>         > >>>> honest opinion this is broken in
>> >> KTableImpl.GroupBy
>> >>>>> that
>> >>>>>>>>>>>> it
>> >>>>>>>>>>>>         looses
>> >>>>>>>>>>>>         > the keys
>> >>>>>>>>>>>>         > >>>> and only maintains the aggregate key.
>> >>>>>>>>>>>>         > >>>>              << I abstracted it away back
>> then way
>> >>>>> before
>> >>>>>>>>>>>> i
>> >>>>>>>>>>>> was
>> >>>>>>>>>>>>         > thinking
>> >>>>>>>>>>>>         > >>>> of oneToMany join. That is why I didn't
>> realize
>> >> its
>> >>>>>>>>>>>>         significance here.
>> >>>>>>>>>>>>         > >>>>              << Opinions?
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>          for (V m : current)
>> >>>>>>>>>>>>         > >>>>          {
>> >>>>>>>>>>>>         > >>>> currentStateAsMap.put(mapper.apply(m), m);
>> >>>>>>>>>>>>         > >>>>          }
>> >>>>>>>>>>>>         > >>>>          if (isAdder)
>> >>>>>>>>>>>>         > >>>>          {
>> >>>>>>>>>>>>         > >>>> currentStateAsMap.put(toModifyKey, value);
>> >>>>>>>>>>>>         > >>>>          }
>> >>>>>>>>>>>>         > >>>>          else
>> >>>>>>>>>>>>         > >>>>          {
>> >>>>>>>>>>>>         > >>>> currentStateAsMap.remove(toModifyKey);
>> >>>>>>>>>>>>         > >>>> if(currentStateAsMap.isEmpty()){
>> >>>>>>>>>>>>         > >>>>                  return null;
>> >>>>>>>>>>>>         > >>>>              }
>> >>>>>>>>>>>>         > >>>>          }
>> >>>>>>>>>>>>         > >>>>          retrun
>> asAggregateType(currentStateAsMap)
>> >>>>>>>>>>>>         > >>>>      }
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>>
>> >>>>>>>>>>>>         > >>>> Thanks,
>> >>>>>>>>>>>>         > >>>>> Adam
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak
>> <
>> >>>>>>>>>>>>         > jan.filip...@trivago.com <mailto:
>> >> jan.filip...@trivago.com
>> >>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > >>>>> wrote:
>> >>>>>>>>>>>>         > >>>>>
>> >>>>>>>>>>>>         > >>>>> Thanks Adam for bringing Matthias to speed!
>> >>>>>>>>>>>>         > >>>>>> about the differences. I think re-keying
>> back
>> >>>>> should be
>> >>>>>>>>>>>>         optional at
>> >>>>>>>>>>>>         > >>>>>> best.
>> >>>>>>>>>>>>         > >>>>>> I would say we return a KScatteredTable with
>> >>>>> reshuffle()
>> >>>>>>>>>>>>         returning
>> >>>>>>>>>>>>         > >>>>>> KTable<originalKey,Joined> to make the
>> backwards
>> >>>>>>>>>>>>         repartitioning
>> >>>>>>>>>>>>         > >>>>>> optional.
>> >>>>>>>>>>>>         > >>>>>> I am also in a big favour of doing the out
>> of
>> >> order
>> >>>>>>>>>>>>         processing using
>> >>>>>>>>>>>>         > >>>>>> group
>> >>>>>>>>>>>>         > >>>>>> by instead high water mark tracking.
>> >>>>>>>>>>>>         > >>>>>> Just because unbounded growth is just scary
>> + It
>> >>>>> saves
>> >>>>>>>>>>>> us
>> >>>>>>>>>>>>         the header
>> >>>>>>>>>>>>         > >>>>>> stuff.
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>> I think the abstraction of always
>> repartitioning
>> >>>>> back is
>> >>>>>>>>>>>>         just not so
>> >>>>>>>>>>>>         > >>>>>> strong. Like the work has been done before
>> we
>> >>>>> partition
>> >>>>>>>>>>>>         back and
>> >>>>>>>>>>>>         > >>>>>> grouping
>> >>>>>>>>>>>>         > >>>>>> by something else afterwards is really
>> common.
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
>> >>>>>>>>>>>>         > >>>>>>
>> >>>>>>>>>>>>         > >>>>>> Hi Matthias
>> >>>>>>>>>>>>         > >>>>>>> Thank you for your feedback, I do
>> appreciate
>> >> it!
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> While name spacing would be possible, it
>> would
>> >>>>> require
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>         > deserialize
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> user headers what implies a runtime
>> overhead.
>> >> I
>> >>>>> would
>> >>>>>>>>>>>>         suggest to
>> >>>>>>>>>>>>         > no
>> >>>>>>>>>>>>         > >>>>>>>> namespace for now to avoid the overhead.
>> If
>> >> this
>> >>>>>>>>>>>>
>> >>>>>>>>>>> becomes a
>> >>>>>>>>>
>> >>>>>>>>>>         > problem in
>> >>>>>>>>>>>>         > >>>>>>>> the future, we can still add name spacing
>> >> later
>> >>>>> on.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Agreed. I will go with using a reserved
>> string
>> >>>>> and
>> >>>>>>>>>>>>         document it.
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> My main concern about the design it the
>> type of
>> >>>>> the
>> >>>>>>>>>>>>         result KTable:
>> >>>>>>>>>>>>         > If
>> >>>>>>>>>>>>         > >>>>>>> I
>> >>>>>>>>>>>>         > >>>>>>> understood the proposal correctly,
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> In your example, you have table1 and table2
>> >>>>> swapped.
>> >>>>>>>>>>>>         Here is how it
>> >>>>>>>>>>>>         > >>>>>>> works
>> >>>>>>>>>>>>         > >>>>>>> currently:
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> 1) table1 has the records that contain the
>> >>>>> foreign key
>> >>>>>>>>>>>>         within their
>> >>>>>>>>>>>>         > >>>>>>> value.
>> >>>>>>>>>>>>         > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
>> >>>>>>>>>>>> <b,(fk=A,bar=2)>,
>> >>>>>>>>>>>>         > >>>>>>> <c,(fk=B,bar=3)>
>> >>>>>>>>>>>>         > >>>>>>> table2 input stream: <A,X>, <B,Y>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> 2) A Value mapper is required to extract
>> the
>> >>>>> foreign
>> >>>>>>>>>>>> key.
>> >>>>>>>>>>>>         > >>>>>>> table1 foreign key mapper: ( value =>
>> value.fk
>> >>>>>>>>>>>>         <http://value.fk> )
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> The mapper is applied to each element in
>> >> table1,
>> >>>>> and a
>> >>>>>>>>>>>>         new combined
>> >>>>>>>>>>>>         > >>>>>>> key is
>> >>>>>>>>>>>>         > >>>>>>> made:
>> >>>>>>>>>>>>         > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b,
>> >>>>>>>>>>>> (fk=A,bar=2)>,
>> >>>>>>>>>>>>         <B-c,
>> >>>>>>>>>>>>         > >>>>>>> (fk=B,bar=3)>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> 3) The rekeyed events are copartitioned
>> with
>> >>>>> table2:
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> a) Stream Thread with Partition 0:
>> >>>>>>>>>>>>         > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>,
>> <A-b,
>> >>>>>>>>>>>>         (fk=A,bar=2)>
>> >>>>>>>>>>>>         > >>>>>>> Table2: <A,X>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> b) Stream Thread with Partition 1:
>> >>>>>>>>>>>>         > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>> >>>>>>>>>>>>         > >>>>>>> Table2: <B,Y>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> 4) From here, they can be joined together
>> >> locally
>> >>>>> by
>> >>>>>>>>>>>>         applying the
>> >>>>>>>>>>>>         > >>>>>>> joiner
>> >>>>>>>>>>>>         > >>>>>>> function.
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> At this point, Jan's design and my design
>> >>>>> deviate. My
>> >>>>>>>>>>>>         design goes
>> >>>>>>>>>>>>         > on
>> >>>>>>>>>>>>         > >>>>>>> to
>> >>>>>>>>>>>>         > >>>>>>> repartition the data post-join and resolve
>> >>>>> out-of-order
>> >>>>>>>>>>>>         arrival of
>> >>>>>>>>>>>>         > >>>>>>> records,
>> >>>>>>>>>>>>         > >>>>>>> finally returning the data keyed just the
>> >>>>> original key.
>> >>>>>>>>>>>>         I do not
>> >>>>>>>>>>>>         > >>>>>>> expose
>> >>>>>>>>>>>>         > >>>>>>> the
>> >>>>>>>>>>>>         > >>>>>>> CombinedKey or any of the internals
>> outside of
>> >> the
>> >>>>>>>>>>>>         joinOnForeignKey
>> >>>>>>>>>>>>         > >>>>>>> function. This does make for larger
>> footprint,
>> >>>>> but it
>> >>>>>>>>>>>>         removes all
>> >>>>>>>>>>>>         > >>>>>>> agency
>> >>>>>>>>>>>>         > >>>>>>> for resolving out-of-order arrivals and
>> >> handling
>> >>>>>>>>>>>>         CombinedKeys from
>> >>>>>>>>>>>>         > the
>> >>>>>>>>>>>>         > >>>>>>> user. I believe that this makes the
>> function
>> >> much
>> >>>>>>>>>>>> easier
>> >>>>>>>>>>>>         to use.
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> Let me know if this helps resolve your
>> >> questions,
>> >>>>> and
>> >>>>>>>>>>>>         please feel
>> >>>>>>>>>>>>         > >>>>>>> free to
>> >>>>>>>>>>>>         > >>>>>>> add anything else on your mind.
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> Thanks again,
>> >>>>>>>>>>>>         > >>>>>>> Adam
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J.
>> >> Sax <
>> >>>>>>>>>>>>         > >>>>>>> matth...@confluent.io <mailto:
>> >>>>> matth...@confluent.io>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> wrote:
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>> Hi,
>> >>>>>>>>>>>>         > >>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> I am just catching up on this thread. I
>> did
>> >> not
>> >>>>> read
>> >>>>>>>>>>>>         everything so
>> >>>>>>>>>>>>         > >>>>>>>> far,
>> >>>>>>>>>>>>         > >>>>>>>> but want to share couple of initial
>> thoughts:
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Headers: I think there is a fundamental
>> >>>>> difference
>> >>>>>>>>>>>>         between header
>> >>>>>>>>>>>>         > >>>>>>>> usage
>> >>>>>>>>>>>>         > >>>>>>>> in this KIP and KP-258. For 258, we add
>> >> headers
>> >>>>> to
>> >>>>>>>>>>>>         changelog topic
>> >>>>>>>>>>>>         > >>>>>>>> that
>> >>>>>>>>>>>>         > >>>>>>>> are owned by Kafka Streams and nobody
>> else is
>> >>>>> supposed
>> >>>>>>>>>>>>         to write
>> >>>>>>>>>>>>         > into
>> >>>>>>>>>>>>         > >>>>>>>> them. In fact, no user header are written
>> into
>> >>>>> the
>> >>>>>>>>>>>>         changelog topic
>> >>>>>>>>>>>>         > >>>>>>>> and
>> >>>>>>>>>>>>         > >>>>>>>> thus, there are not conflicts.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Nevertheless, I don't see a big issue with
>> >> using
>> >>>>>>>>>>>>         headers within
>> >>>>>>>>>>>>         > >>>>>>>> Streams.
>> >>>>>>>>>>>>         > >>>>>>>> As long as we document it, we can have
>> some
>> >>>>> "reserved"
>> >>>>>>>>>>>>         header keys
>> >>>>>>>>>>>>         > >>>>>>>> and
>> >>>>>>>>>>>>         > >>>>>>>> users are not allowed to use when
>> processing
>> >>>>> data with
>> >>>>>>>>>>>>         Kafka
>> >>>>>>>>>>>>         > Streams.
>> >>>>>>>>>>>>         > >>>>>>>> IMHO, this should be ok.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> I think there is a safe way to avoid
>> >> conflicts,
>> >>>>> since
>> >>>>>>>>>>>> these
>> >>>>>>>>>>>>         > headers
>> >>>>>>>>>>>>         > >>>>>>>> are
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>> only needed in internal topics (I think):
>> >>>>>>>>>>>>         > >>>>>>>>> For internal and changelog topics, we can
>> >>>>> namespace
>> >>>>>>>>>>>>         all headers:
>> >>>>>>>>>>>>         > >>>>>>>>> * user-defined headers are namespaced as
>> >>>>> "external."
>> >>>>>>>>>>>> +
>> >>>>>>>>>>>>         headerKey
>> >>>>>>>>>>>>         > >>>>>>>>> * internal headers are namespaced as
>> >>>>> "internal." +
>> >>>>>>>>>>>>         headerKey
>> >>>>>>>>>>>>         > >>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>> While name spacing would be possible, it
>> >> would
>> >>>>>>>>>>>> require
>> >>>>>>>>>>>>
>> >>>>>>>>>>> to
>> >>>>>>>>>
>> >>>>>>>>>>         > >>>>>>>> deserialize
>> >>>>>>>>>>>>         > >>>>>>>> user headers what implies a runtime
>> overhead.
>> >> I
>> >>>>> would
>> >>>>>>>>>>>>         suggest to
>> >>>>>>>>>>>>         > no
>> >>>>>>>>>>>>         > >>>>>>>> namespace for now to avoid the overhead.
>> If
>> >> this
>> >>>>>>>>>>>>
>> >>>>>>>>>>> becomes a
>> >>>>>>>>>
>> >>>>>>>>>>         > problem in
>> >>>>>>>>>>>>         > >>>>>>>> the future, we can still add name spacing
>> >> later
>> >>>>> on.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> My main concern about the design it the
>> type
>> >> of
>> >>>>> the
>> >>>>>>>>>>>>         result KTable:
>> >>>>>>>>>>>>         > >>>>>>>> If I
>> >>>>>>>>>>>>         > >>>>>>>> understood the proposal correctly,
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> KTable<K1,V1> table1 = ...
>> >>>>>>>>>>>>         > >>>>>>>> KTable<K2,V2> table2 = ...
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> KTable<K1,V3> joinedTable =
>> >>>>> table1.join(table2,...);
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> implies that the `joinedTable` has the
>> same
>> >> key
>> >>>>> as the
>> >>>>>>>>>>>>         left input
>> >>>>>>>>>>>>         > >>>>>>>> table.
>> >>>>>>>>>>>>         > >>>>>>>> IMHO, this does not work because if table2
>> >>>>> contains
>> >>>>>>>>>>>>         multiple rows
>> >>>>>>>>>>>>         > >>>>>>>> that
>> >>>>>>>>>>>>         > >>>>>>>> join with a record in table1 (what is the
>> main
>> >>>>> purpose
>> >>>>>>>>>>>>
>> >>>>>>>>>>> of
>> >>>>>>>>>
>> >>>>>>>>>> a
>> >>>>>>>>>>>>         > foreign
>> >>>>>>>>>>>>         > >>>>>>>> key
>> >>>>>>>>>>>>         > >>>>>>>> join), the result table would only
>> contain a
>> >>>>> single
>> >>>>>>>>>>>>         join result,
>> >>>>>>>>>>>>         > but
>> >>>>>>>>>>>>         > >>>>>>>> not
>> >>>>>>>>>>>>         > >>>>>>>> multiple.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Example:
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> table1 input stream: <A,X>
>> >>>>>>>>>>>>         > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> We use table2 value a foreign key to
>> table1
>> >> key
>> >>>>> (ie,
>> >>>>>>>>>>>>         "A" joins).
>> >>>>>>>>>>>>         > If
>> >>>>>>>>>>>>         > >>>>>>>> the
>> >>>>>>>>>>>>         > >>>>>>>> result key is the same key as key of
>> table1,
>> >> this
>> >>>>>>>>>>>>         implies that the
>> >>>>>>>>>>>>         > >>>>>>>> result can either be <A, join(X,1)> or <A,
>> >>>>> join(X,2)>
>> >>>>>>>>>>>>         but not
>> >>>>>>>>>>>>         > both.
>> >>>>>>>>>>>>         > >>>>>>>> Because the share the same key, whatever
>> >> result
>> >>>>> record
>> >>>>>>>>>>>>         we emit
>> >>>>>>>>>>>>         > later,
>> >>>>>>>>>>>>         > >>>>>>>> overwrite the previous result.
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> This is the reason why Jan originally
>> proposed
>> >>>>> to use
>> >>>>>>>>>>>> a
>> >>>>>>>>>>>>         > combination
>> >>>>>>>>>>>>         > >>>>>>>> of
>> >>>>>>>>>>>>         > >>>>>>>> both primary keys of the input tables as
>> key
>> >> of
>> >>>>> the
>> >>>>>>>>>>>>         output table.
>> >>>>>>>>>>>>         > >>>>>>>> This
>> >>>>>>>>>>>>         > >>>>>>>> makes the keys of the output table unique
>> and
>> >> we
>> >>>>> can
>> >>>>>>>>>>>>         store both in
>> >>>>>>>>>>>>         > >>>>>>>> the
>> >>>>>>>>>>>>         > >>>>>>>> output table:
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b,
>> >>>>> join(X,2)>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Thoughts?
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> -Matthias
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>> >>>>>>>>>>>>         > >>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>> Just on remark here.
>> >>>>>>>>>>>>         > >>>>>>>>> The high-watermark could be disregarded.
>> The
>> >>>>> decision
>> >>>>>>>>>>>>         about the
>> >>>>>>>>>>>>         > >>>>>>>>> forward
>> >>>>>>>>>>>>         > >>>>>>>>> depends on the size of the aggregated
>> map.
>> >>>>>>>>>>>>         > >>>>>>>>> Only 1 element long maps would be
>> unpacked
>> >> and
>> >>>>>>>>>>>>         forwarded. 0
>> >>>>>>>>>>>>         > element
>> >>>>>>>>>>>>         > >>>>>>>>> maps
>> >>>>>>>>>>>>         > >>>>>>>>> would be published as delete. Any other
>> count
>> >>>>>>>>>>>>         > >>>>>>>>> of map entries is in "waiting for correct
>> >>>>> deletes to
>> >>>>>>>>>>>>         > arrive"-state.
>> >>>>>>>>>>>>         > >>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare
>> wrote:
>> >>>>>>>>>>>>         > >>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>> It does look like I could replace the
>> second
>> >>>>>>>>>>>>         repartition store
>> >>>>>>>>>>>>         > and
>> >>>>>>>>>>>>         > >>>>>>>>>> highwater store with a groupBy and
>> reduce.
>> >>>>> However,
>> >>>>>>>>>>>>         it looks
>> >>>>>>>>>>>>         > like
>> >>>>>>>>>>>>         > >>>>>>>>>> I
>> >>>>>>>>>>>>         > >>>>>>>>>> would
>> >>>>>>>>>>>>         > >>>>>>>>>> still need to store the highwater value
>> >> within
>> >>>>> the
>> >>>>>>>>>>>>         materialized
>> >>>>>>>>>>>>         > >>>>>>>>>> store,
>> >>>>>>>>>>>>         > >>>>>>>>>>
>> >>>>>>>>>>>>         > >>>>>>>>>> to
>> >>>>>>>>>>>>         > >>>>>>>>> compare the arrival of out-of-order
>> records
>> >>>>> (assuming
>> >>>>>>>>>>>>
>> >>>>>>>>>>> my
>> >>>>>>>>>
>> >>>>>>>>>>         > >>>>>>>>> understanding
>> >>>>>>>>>>>>         > >>>>>>>>> of
>> >>>>>>>>>>>>         > >>>>>>>>> THIS is correct...). This in effect is
>> the
>> >> same
>> >>>>> as
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>         design I
>> >>>>>>>>>>>>         > have
>> >>>>>>>>>>>>         > >>>>>>>>> now,
>> >>>>>>>>>>>>         > >>>>>>>>> just with the two tables merged together.
>> >>>>>>>>>>>>         > >>>>>>>>>
>> >>>>>>>>>>>>         >
>> >>>>>>>>>>>>         >
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>> -- Guozhang
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> -- Guozhang
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to