Yep, I definitely misunderstood some of the groupBy and groupByKey
functionality. I would say disregard what I said in my previous email
w.r.t. my assumptions about record size. I was looking into the code more
today and I did not understand it correctly the first time I read it.

It does look like I could replace the second repartition store and
highwater store with a groupBy and reduce.  However, it looks like I would
still need to store the highwater value within the materialized store, to
compare the arrival of out-of-order records (assuming my understanding of
THIS is correct...). This in effect is the same as the design I have now,
just with the two tables merged together.

I will keep looking at this but I am not seeing a great simplification.
Advice and comments are welcomed as always.

On Tue, Sep 4, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

>
> As I was looking more into RocksDB TTL, I see that we currently do not
> support it in Kafka Streams due to a number of technical reasons. As I
> don't think that I will be tackling that JIRA at the moment, the current
> implementation is indeed unbounded in the highwater table growth.
>
> An alternate option may be to replace the highwater mark table with a
> groupBy and then perform a reduce/aggregate. My main concern here is that
> technically we could have an unbounded amount of data to group together by
> key, and the grouped size could exceed the Kafka maximum record size. When
> I built the highwater mark table my intent was to work around this
> possibility, as each record is evaluated independently and record sizing
> issues do not come into play. If I am incorrect in this assumption, please
> correct me, because I am a bit fuzzy on exactly how the groupBy currently
> works.
>
> Any thoughts on this are appreciated. I will revisit it again when I have
> a bit more time.
>
> Thanks
>
>
>
> On Mon, Sep 3, 2018 at 4:55 PM, Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
>> Hi Jan
>>
>> Thank you for taking the time to look into my PR. I have updated it
>> accordingly along with the suggestions from John. Please note that I am by
>> no means an expert on Java, so I really do appreciate any Java-specific
>> feedback you may have. Do not worry about being overly verbose on it.
>>
>> You are correct with regards to the highwater mark growing unbounded. One
>> option would be to implement the rocksDB TTL to expire records. I am open
>> to other options as well.
>>
>> I have tried to detail the reasoning behind it in the KIP - I have added
>> additional comments and I hope that it is clearer now.
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjo
>> ininginKTable-MultipleRapidForeign-KeyValueChanges-Whyahighw
>> atertableisrequired.
>>
>> Please keep in mind that there may be something about ordering guarantees
>> that I am not aware of. As far as I know, once you begin to operate on
>> events in parallel across different nodes within the processor API, there
>> are no ordering guarantees and everything is simple first-come,
>> first-served(processed). If this is not the case then I am unaware of that
>> fact.
>>
>>
>>
>> Thanks
>>
>> Adam
>>
>>
>>
>>
>>
>> On Mon, Sep 3, 2018 at 8:38 AM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>>> Finished my deeper scan on your approach.
>>> Most of the comments I put at the PR are minor code style things.
>>> One forward call seems to be a bug though, would be great if you could
>>> double check.
>>>
>>> the one problem I see is that the high watermark store grows unbounded.
>>> A key being deleted from the source table does not lead to deletion in
>>> the watermark store.
>>>
>>> I also don't quite grasp the concept why it's needed.  I think the whole
>>> offset part can go away?
>>> It seems to deal with node failures of some kind but everything should
>>> turn out okay without it?
>>>
>>> Best Jan
>>>
>>>
>>> On 01.09.2018 20:44, Guozhang Wang wrote:
>>>
>>>> Yes Adam, that makes sense.
>>>>
>>>> I think it may be better to have a working PR to review before we
>>>> complete
>>>> the VOTE thread. In my previous experience a large feature like this are
>>>> mostly definitely going to miss some devils in the details in the design
>>>> and wiki discussion phases.
>>>>
>>>> That would unfortunately mean that your implementations may need to be
>>>> modified / updated along with the review and further KIP discussion. I
>>>> can
>>>> understand this can be painful, but that may be the best option we can
>>>> do
>>>> to avoid as much work to be wasted as possible.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Wed, Aug 29, 2018 at 10:06 AM, Adam Bellemare <
>>>> adam.bellem...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Guozhang
>>>>>
>>>>> By workflow I mean just the overall process of how the KIP is
>>>>> implemented.
>>>>> Any ideas on the ways to reduce the topic count, materializations, if
>>>>> there
>>>>> is a better way to resolve out-of-order than a highwater mark table,
>>>>> if the
>>>>> design philosophy of “keep everything encapsulated within the join
>>>>> function” is appropriate, etc. I can implement the changes that John
>>>>> suggested, but if my overall workflow is not acceptable I would rather
>>>>> address that before making minor changes.
>>>>>
>>>>> If this requires a full candidate PR ready to go to prod then I can
>>>>> make
>>>>> those changes. Hope that clears things up.
>>>>>
>>>>> Thanks
>>>>>
>>>>> Adam
>>>>>
>>>>> On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Adam,
>>>>>>
>>>>>> What do you mean by "additional comments on the workflow.", do you
>>>>>> mean
>>>>>>
>>>>> to
>>>>>
>>>>>> let other review your PR https://github.com/apache/kafka/pull/5527 ?
>>>>>> Is
>>>>>>
>>>>> is
>>>>>
>>>>>> ready for reviews?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <
>>>>>>
>>>>> adam.bellem...@gmail.com>
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Okay, I will implement John's suggestion of namespacing the external
>>>>>>> headers prior to processing, and then removing the namespacing prior
>>>>>>> to
>>>>>>> emitting. A potential future KIP could be to provide this namespacing
>>>>>>> automatically.
>>>>>>>
>>>>>>> I would also appreciate any other additional comments on the
>>>>>>> workflow.
>>>>>>>
>>>>>> My
>>>>>
>>>>>> goal is suss out agreement prior to moving to a vote.
>>>>>>>
>>>>>>> On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com>
>>>>>>>>
>>>>>>> wrote:
>>>>>
>>>>>> I like John's idea as well: for this KIP specifically as we do not
>>>>>>>>
>>>>>>> expect
>>>>>
>>>>>> any other consumers to read the repartition topics externally, we can
>>>>>>>> slightly prefix the header to be safe, while keeping the additional
>>>>>>>>
>>>>>>> cost
>>>>>
>>>>>> (note the header field is per-record, so any additional byte is
>>>>>>>>
>>>>>>> per-record
>>>>>>>
>>>>>>>> as well) low.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare <
>>>>>>>>
>>>>>>> adam.bellem...@gmail.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi John
>>>>>>>>>
>>>>>>>>> That is an excellent idea. The header usage I propose would be
>>>>>>>>> limited
>>>>>>>>> entirely to internal topics, and this could very well be the
>>>>>>>>> solution
>>>>>>>>>
>>>>>>>> to
>>>>>>>
>>>>>>>> potential conflicts. If we do not officially reserve a prefix "__"
>>>>>>>>>
>>>>>>>> then I
>>>>>>>
>>>>>>>> think this would be the safest idea, as it would entirely avoid any
>>>>>>>>> accidents (perhaps if a company is using its own "__" prefix for
>>>>>>>>> other
>>>>>>>>> reasons).
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> Adam
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just a quick thought regarding headers:
>>>>>>>>>>
>>>>>>>>>>> I think there is no absolute-safe ways to avoid conflicts, but we
>>>>>>>>>>>
>>>>>>>>>> can
>>>>>>>
>>>>>>>> still
>>>>>>>>>>
>>>>>>>>>>> consider using some name patterns to reduce the likelihood as
>>>>>>>>>>> much
>>>>>>>>>>>
>>>>>>>>>> as
>>>>>>>
>>>>>>>> possible.. e.g. consider sth. like the internal topics naming: e.g.
>>>>>>>>>>> "__internal_[name]"?
>>>>>>>>>>>
>>>>>>>>>> I think there is a safe way to avoid conflicts, since these
>>>>>>>>>> headers
>>>>>>>>>>
>>>>>>>>> are
>>>>>>>
>>>>>>>> only needed in internal topics (I think):
>>>>>>>>>> For internal and changelog topics, we can namespace all headers:
>>>>>>>>>> * user-defined headers are namespaced as "external." + headerKey
>>>>>>>>>> * internal headers are namespaced as "internal." + headerKey
>>>>>>>>>>
>>>>>>>>>> This is a lot of characters, so we could use a sigil instead
>>>>>>>>>> (e.g.,
>>>>>>>>>>
>>>>>>>>> "_"
>>>>>>>
>>>>>>>> for
>>>>>>>>>
>>>>>>>>>> internal, "~" for external)
>>>>>>>>>>
>>>>>>>>>> We simply apply the namespacing when we read user headers from
>>>>>>>>>>
>>>>>>>>> external
>>>>>>>
>>>>>>>> topics into the topology and then de-namespace them before we emit
>>>>>>>>>>
>>>>>>>>> them
>>>>>>>
>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> an external topic (via "to" or "through").
>>>>>>>>>> Now, it is not possible to collide with user-defined headers.
>>>>>>>>>>
>>>>>>>>>> That said, I'd also be fine with just reserving "__" as a header
>>>>>>>>>>
>>>>>>>>> prefix
>>>>>>>
>>>>>>>> and
>>>>>>>>>
>>>>>>>>>> not worrying about collisions.
>>>>>>>>>>
>>>>>>>>>> Thanks for the KIP,
>>>>>>>>>> -John
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak <
>>>>>>>>>>
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Still havent completly grabbed it.
>>>>>>>>>>> sorry will read more
>>>>>>>>>>>
>>>>>>>>>>> On 17.08.2018 21:23, Jan Filipiak wrote:
>>>>>>>>>>>> Cool stuff.
>>>>>>>>>>>>
>>>>>>>>>>>> I made some random remarks. Did not touch the core of the
>>>>>>>>>>>>
>>>>>>>>>>> algorithm
>>>>>>>
>>>>>>>> yet.
>>>>>>>>>>
>>>>>>>>>>> Will do Monday 100%
>>>>>>>>>>>>
>>>>>>>>>>>> I don't see Interactive Queries :) like that!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 17.08.2018 20:28, Adam Bellemare wrote:
>>>>>>>>>>>>> I have submitted a PR with my code against trunk:
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>>>>>>>
>>>>>>>>>>>>> Do I continue on this thread or do we begin a new one for
>>>>>>>>>>>>>
>>>>>>>>>>>> discussion?
>>>>>>>>>
>>>>>>>>>> On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <
>>>>>>>>>>>>>
>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> even before message headers, the option for me always existed
>>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>>
>>>>>>>> just wrap
>>>>>>>>>>>>>> the messages into my own custom envelop.
>>>>>>>>>>>>>> So I of course thought this through. One sentence in your last
>>>>>>>>>>>>>>
>>>>>>>>>>>>> email
>>>>>>>>>
>>>>>>>>>> triggered all the thought process I put in the back then
>>>>>>>>>>>>>> again to design it in the, what i think is the "kafka-way". It
>>>>>>>>>>>>>>
>>>>>>>>>>>>> ended
>>>>>>>>>
>>>>>>>>>> up
>>>>>>>>>>
>>>>>>>>>>> ranting a little about what happened in the past.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see plenty of colleagues of mine falling into traps in the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> API,
>>>>>>>
>>>>>>>> that I
>>>>>>>>>>>>>> did warn about in the 1.0 DSL rewrite. I have the same
>>>>>>>>>>>>>> feeling again. So I hope it gives you some insights into my
>>>>>>>>>>>>>>
>>>>>>>>>>>>> though
>>>>>>>>
>>>>>>>>> process. I am aware that since i never ported 213 to higher
>>>>>>>>>>>>>> streams version, I don't really have a steak here and
>>>>>>>>>>>>>>
>>>>>>>>>>>>> initially I
>>>>>>>
>>>>>>>> didn't
>>>>>>>>>>>>>> feel like actually sending it. But maybe you can pull
>>>>>>>>>>>>>> something good from it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   Best jan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 15.08.2018 04:44, Adam Bellemare wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>> Thanks Jan. I take it you mean "key-widening" somehow
>>>>>>>>>>>>>>> includes
>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>> about which record is processed first? I understand about a
>>>>>>>>>>>>>>> CombinedKey
>>>>>>>>>>>>>>> with both the Foreign and Primary key, but I don't see how
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> track
>>>>>>>>>
>>>>>>>>>> ordering metadata in there unless you actually included a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> metadata
>>>>>>>>
>>>>>>>>> field
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the key type as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Guozhang
>>>>>>>>>>>>>>> As Jan mentioned earlier, is Record Headers mean to strictly
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be
>>>>>>>
>>>>>>>> used in
>>>>>>>>>>>>>>> just the user-space? It seems that it is possible that a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> collision
>>>>>>>>
>>>>>>>>> on the
>>>>>>>>>>>>>>> (key,value) tuple I wish to add to it could occur. For
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> instance,
>>>>>>>
>>>>>>>> if
>>>>>>>>>
>>>>>>>>>> I
>>>>>>>>>>
>>>>>>>>>>> wanted to add a ("foreignKeyOffset",10) to the Headers but the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> user
>>>>>>>>>
>>>>>>>>>> already
>>>>>>>>>>>>>>> specified their own header with the same key name, then it
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> appears
>>>>>>>>
>>>>>>>>> there
>>>>>>>>>>>>>>> would be a collision. (This is one of the issues I brought up
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> in
>>>>>>>
>>>>>>>> the KIP).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --------------------------------
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I will be posting a prototype PR against trunk within the
>>>>>>>>>>>>>>> next
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> day
>>>>>>>>
>>>>>>>>> or two.
>>>>>>>>>>>>>>> One thing I need to point out is that my design very strictly
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wraps
>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>>>>>> entire foreignKeyJoin process entirely within the DSL
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> function.
>>>>>>>
>>>>>>>> There is
>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>> exposure of CombinedKeys or widened keys, nothing to resolve
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> with
>>>>>>>>
>>>>>>>>> regards
>>>>>>>>>>>>>>> to out-of-order processing and no need for the DSL user to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> even
>>>>>>>
>>>>>>>> know
>>>>>>>>>
>>>>>>>>>> what's
>>>>>>>>>>>>>>> going on inside of the function. The code simply returns the
>>>>>>>>>>>>>>> results of
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> join, keyed by the original key. Currently my API mirrors
>>>>>>>>>>>>>>> identically the
>>>>>>>>>>>>>>> format of the data returned by the regular join function, and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I
>>>>>>>
>>>>>>>> believe
>>>>>>>>>>>>>>> that this is very useful to many users of the DSL. It is my
>>>>>>>>>>>>>>> understanding
>>>>>>>>>>>>>>> that one of the main design goals of the DSL is to provide
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> higher
>>>>>>>>
>>>>>>>>> level
>>>>>>>>>>>>>>> functionality without requiring the users to know exactly
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> what's
>>>>>>>
>>>>>>>> going on
>>>>>>>>>>>>>>> under the hood. With this in mind, I thought it best to solve
>>>>>>>>>>>>>>> ordering and
>>>>>>>>>>>>>>> partitioning problems within the function and eliminate the
>>>>>>>>>>>>>>> requirement
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> users to do additional work after the fact to resolve the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> results
>>>>>>>>
>>>>>>>>> of their
>>>>>>>>>>>>>>> join. Basically, I am assuming that most users of the DSL
>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>> "want it to
>>>>>>>>>>>>>>> work" and want it to be easy. I did this operating under the
>>>>>>>>>>>>>>> assumption
>>>>>>>>>>>>>>> that if a user truly wants to optimize their own workflow
>>>>>>>>>>>>>>> down
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> finest details then they will break from strictly using the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DSL
>>>>>>>
>>>>>>>> and
>>>>>>>>>
>>>>>>>>>> move
>>>>>>>>>>>>>>> down to the processors API.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think. The abstraction is not powerful enough
>>>>>>>>>>>>>> to not have kafka specifics leak up The leak I currently think
>>>>>>>>>>>>>>
>>>>>>>>>>>>> this
>>>>>>>>
>>>>>>>>> has is
>>>>>>>>>>>>>> that you can not reliable prevent the delete coming out first,
>>>>>>>>>>>>>> before you emit the correct new record. As it is an
>>>>>>>>>>>>>> abstraction
>>>>>>>>>>>>>> entirely
>>>>>>>>>>>>>> around kafka.
>>>>>>>>>>>>>> I can only recommend to not to. Honesty and simplicity should
>>>>>>>>>>>>>>
>>>>>>>>>>>>> always
>>>>>>>>>
>>>>>>>>>> be
>>>>>>>>>>
>>>>>>>>>>> first prio
>>>>>>>>>>>>>> trying to hide this just makes it more complex, less
>>>>>>>>>>>>>>
>>>>>>>>>>>>> understandable
>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>> will lead to mistakes
>>>>>>>>>>>>>> in usage.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Exactly why I am also in big disfavour of GraphNodes and later
>>>>>>>>>>>>>> optimization stages.
>>>>>>>>>>>>>> Can someone give me an example of an optimisation that really
>>>>>>>>>>>>>>
>>>>>>>>>>>>> can't
>>>>>>>>
>>>>>>>>> be
>>>>>>>>>>
>>>>>>>>>>> handled by the user
>>>>>>>>>>>>>> constructing his topology differently?
>>>>>>>>>>>>>> Having reusable Processor API components accessible by the DSL
>>>>>>>>>>>>>>
>>>>>>>>>>>>> and
>>>>>>>>
>>>>>>>>> composable as
>>>>>>>>>>>>>> one likes is exactly where DSL should max out and KSQL should
>>>>>>>>>>>>>>
>>>>>>>>>>>>> do
>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> next
>>>>>>>>>>>>>> step.
>>>>>>>>>>>>>> I find it very unprofessional from a software engineering
>>>>>>>>>>>>>>
>>>>>>>>>>>>> approach
>>>>>>>>
>>>>>>>>> to run
>>>>>>>>>>>>>> software where
>>>>>>>>>>>>>> you can not at least senseful reason about the inner workings
>>>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> libraries used.
>>>>>>>>>>>>>> Gives this people have to read and understand in anyway, why
>>>>>>>>>>>>>>
>>>>>>>>>>>>> try
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> hide
>>>>>>>>>>>>>> it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It really miss the beauty of 0.10 version DSL.
>>>>>>>>>>>>>> Apparently not a thing I can influence but just warn about.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @gouzhang
>>>>>>>>>>>>>> you can't imagine how many extra IQ-Statestores I constantly
>>>>>>>>>>>>>>
>>>>>>>>>>>>> prune
>>>>>>>>
>>>>>>>>> from
>>>>>>>>>>
>>>>>>>>>>> stream app's
>>>>>>>>>>>>>> because people just keep passing Materialized's into all the
>>>>>>>>>>>>>> operations.
>>>>>>>>>>>>>> :D :'-(
>>>>>>>>>>>>>> I regret that I couldn't convince you guys back then. Plus
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>
>>>>>>>>>>>>> whole
>>>>>>>>>
>>>>>>>>>> entire topology as a floating
>>>>>>>>>>>>>> interface chain, never seen it anywhere :-/ :'(
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't know. I guess this is just me regretting to only have
>>>>>>>>>>>>>>
>>>>>>>>>>>>> 24h/day.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I updated the KIP today with some points worth talking about,
>>>>>>>>>>>>>>
>>>>>>>>>>>>> should
>>>>>>>>>
>>>>>>>>>> anyone
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be so inclined to check it out. Currently we are running this
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> code
>>>>>>>>
>>>>>>>>> in
>>>>>>>>>>
>>>>>>>>>>> production to handle relational joins from our Kafka Connect
>>>>>>>>>>>>>>> topics, as
>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>> the original motivation of the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I believe the foreignKeyJoin should be responsible for. In my
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As for your question regarding GraphNodes, it is for
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> extending
>>>>>>>
>>>>>>>> Streams
>>>>>>>>>>>>>>>> optimization framework. You can find more details on
>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6761.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The main idea is that instead of directly building up the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "physical
>>>>>>>>>
>>>>>>>>>> topology" (represented as Topology in the public package, and
>>>>>>>>>>>>>>>> internally
>>>>>>>>>>>>>>>> built as the ProcessorTopology class) while users are
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> specifying
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> transformation operators, we first keep it as a "logical
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> topology"
>>>>>>>>>
>>>>>>>>>> (represented as GraphNode inside InternalStreamsBuilder). And
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> then
>>>>>>>>>
>>>>>>>>>> only
>>>>>>>>>>>>>>>> execute the optimization and the construction of the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "physical"
>>>>>>>
>>>>>>>> Topology
>>>>>>>>>>>>>>>> when StreamsBuilder.build() is called.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Back to your question, I think it makes more sense to add a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> new
>>>>>>>
>>>>>>>> type of
>>>>>>>>>>>>>>>> StreamsGraphNode (maybe you can consider inheriting from the
>>>>>>>>>>>>>>>> BaseJoinProcessorNode). Note that although in the Topology
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> will
>>>>>>>>>
>>>>>>>>>> have
>>>>>>>>>>>>>>>> multiple connected ProcessorNodes to represent a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (foreign-key)
>>>>>>>
>>>>>>>> join, we
>>>>>>>>>>>>>>>> still want to keep it as a single StreamsGraphNode, or just
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> couple of
>>>>>>>>>>>>>>>> them in the logical representation so that in the future we
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> can
>>>>>>>
>>>>>>>> construct
>>>>>>>>>>>>>>>> the physical topology differently (e.g. having another way
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> than
>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> current
>>>>>>>>>>>>>>>> distributed hash-join).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -------------------------------------------------------
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Back to your questions to KIP-213, I think Jan has
>>>>>>>>>>>>>>>> summarized
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> it
>>>>>>>>
>>>>>>>>> pretty-well. Note that back then we do not have headers
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> support
>>>>>>>
>>>>>>>> so
>>>>>>>>>
>>>>>>>>>> we
>>>>>>>>>>
>>>>>>>>>>> have
>>>>>>>>>>>>>>>> to do such "key-widening" approach to ensure ordering.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan
>>>>>>>>>>>>>>>> Filipiak<jan.filip...@trivago.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I love how you are on to this already! I resolve this by
>>>>>>>>>>>>>>>>> "key-widening"
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>> treat the result of FKA,and FKB differently.
>>>>>>>>>>>>>>>>> As you can see the output of my join has a Combined Key and
>>>>>>>>>>>>>>>>> therefore I
>>>>>>>>>>>>>>>>> can resolve the "race condition" in a group by
>>>>>>>>>>>>>>>>> if I so desire.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think this reflects more what happens under the hood and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> makes
>>>>>>>>
>>>>>>>>> it more
>>>>>>>>>>>>>>>>> clear to the user what is going on. The Idea
>>>>>>>>>>>>>>>>> of hiding this behind metadata and handle it in the DSL is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> from
>>>>>>>>
>>>>>>>>> my POV
>>>>>>>>>>>>>>>>> unideal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> To write into your example:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> key + A, null)
>>>>>>>>>>>>>>>>> (key +B, <joined On FK =B>)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> is what my output would look like.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hope that makes sense :D
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 13.08.2018 18:16, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If you do not use headers or other metadata, how do you
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ensure
>>>>>>>>
>>>>>>>>> that
>>>>>>>>>>
>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>> to the foreign-key value are not resolved out-of-order?
>>>>>>>>>>>>>>>>>> ie: If an event has FK = A, but you change it to FK = B,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> you
>>>>>>>
>>>>>>>> need to
>>>>>>>>>>>>>>>>>> propagate both a delete (FK=A -> null) and an addition
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (FK=B).
>>>>>>>>
>>>>>>>>> In my
>>>>>>>>>>>>>>>>>> solution, without maintaining any metadata, it is possible
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> final
>>>>>>>>>>>>>>>>>> output to be in either order - the correctly updated
>>>>>>>>>>>>>>>>>> joined
>>>>>>>>>>>>>>>>>> value, or
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> null for the delete.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I looked back through your code and through the discussion
>>>>>>>>>>>>>>>>>> threads, and
>>>>>>>>>>>>>>>>>> didn't see any information on how you resolved this. I
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> have a
>>>>>>>
>>>>>>>> version
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>> code working for 2.0, I am just adding more integration
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> tests
>>>>>>>
>>>>>>>> and will
>>>>>>>>>>>>>>>>>> update the KIP accordingly. Any insight you could provide
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> on
>>>>>>>
>>>>>>>> resolving
>>>>>>>>>>>>>>>>>> out-of-order semantics without metadata would be helpful.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Happy to see that you want to make an effort here.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding the ProcessSuppliers I couldn't find a way to
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> not
>>>>>>>
>>>>>>>> rewrite
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> joiners + the merger.
>>>>>>>>>>>>>>>>>>> The re-partitioners can be reused in theory. I don't know
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> if
>>>>>>>
>>>>>>>> repartition
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> is optimized in 2.0 now.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I made this
>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 241+
>>>>>>>
>>>>>>>> KTable+repartition+with+compacted+Topics
>>>>>>>>>>>>>>>>>>> back then and we are running KIP-213 with KIP-241 in
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> combination.
>>>>>>>>>>
>>>>>>>>>>> For us it is vital as it minimized the size we had in our
>>>>>>>>>>>>>>>>>>> repartition
>>>>>>>>>>>>>>>>>>> topics plus it removed the factor of 2 in events on every
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> message.
>>>>>>>>>>
>>>>>>>>>>> I know about this new  "delete once consumer has read it".
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I
>>>>>>>>
>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 241 is vital for all usecases, for ours it is. I wanted
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> to use 213 to sneak in the foundations for 241 aswell.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I don't quite understand what a PropagationWrapper is,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> but I
>>>>>>>
>>>>>>>> am
>>>>>>>>>
>>>>>>>>>> certain
>>>>>>>>>>>>>>>>>>> that you do not need RecordHeaders
>>>>>>>>>>>>>>>>>>> for 213 and I would try to leave them out. They either
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> belong
>>>>>>>>
>>>>>>>>> to the
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> DSL
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> or to the user, having a mixed use is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> to be avoided. We run the join with 0.8 logformat and I
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> don't
>>>>>>>>
>>>>>>>>> think
>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>> needs more.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This KIP will be very valuable for the streams project! I
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> couldn't
>>>>>>>>>>
>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> convince myself to invest into the 1.0+ DSL
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> as I used almost all my energy to fight against it. Maybe
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> this
>>>>>>>>
>>>>>>>>> can
>>>>>>>>>>
>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>> help me see the good sides a little bit more.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If there is anything unclear with all the text that has
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> been
>>>>>>>
>>>>>>>> written,
>>>>>>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>>>> free to just directly cc me so I don't miss it on
>>>>>>>>>>>>>>>>>>> the mailing list.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> More followup, and +dev as Guozhang replied to me
>>>>>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>>>> previously.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am currently porting the code over to trunk. One of the
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> major
>>>>>>>>>
>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> since 1.0 is the usage of GraphNodes. I have a question
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> about
>>>>>>>
>>>>>>>> this:
>>>>>>>>>>
>>>>>>>>>>> For a foreignKey joiner, should it have its own dedicated
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> node
>>>>>>>>>
>>>>>>>>>> type?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Or
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> would it be advisable to construct it from existing
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> GraphNode
>>>>>>>
>>>>>>>> components?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>
>

Reply via email to