Hi Guozhang 

By workflow I mean just the overall process of how the KIP is implemented. Any 
ideas on the ways to reduce the topic count, materializations, if there is a 
better way to resolve out-of-order than a highwater mark table, if the design 
philosophy of “keep everything encapsulated within the join function” is 
appropriate, etc. I can implement the changes that John suggested, but if my 
overall workflow is not acceptable I would rather address that before making 
minor changes. 

If this requires a full candidate PR ready to go to prod then I can make those 
changes. Hope that clears things up.

Thanks

Adam 

> On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Hi Adam,
> 
> What do you mean by "additional comments on the workflow.", do you mean to
> let other review your PR https://github.com/apache/kafka/pull/5527 ? Is is
> ready for reviews?
> 
> 
> Guozhang
> 
> On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
> 
>> Okay, I will implement John's suggestion of namespacing the external
>> headers prior to processing, and then removing the namespacing prior to
>> emitting. A potential future KIP could be to provide this namespacing
>> automatically.
>> 
>> I would also appreciate any other additional comments on the workflow. My
>> goal is suss out agreement prior to moving to a vote.
>> 
>>> On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> 
>>> I like John's idea as well: for this KIP specifically as we do not expect
>>> any other consumers to read the repartition topics externally, we can
>>> slightly prefix the header to be safe, while keeping the additional cost
>>> (note the header field is per-record, so any additional byte is
>> per-record
>>> as well) low.
>>> 
>>> 
>>> Guozhang
>>> 
>>> On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare <
>> adam.bellem...@gmail.com
>>>> 
>>> wrote:
>>> 
>>>> Hi John
>>>> 
>>>> That is an excellent idea. The header usage I propose would be limited
>>>> entirely to internal topics, and this could very well be the solution
>> to
>>>> potential conflicts. If we do not officially reserve a prefix "__"
>> then I
>>>> think this would be the safest idea, as it would entirely avoid any
>>>> accidents (perhaps if a company is using its own "__" prefix for other
>>>> reasons).
>>>> 
>>>> Thanks
>>>> 
>>>> Adam
>>>> 
>>>> 
>>>> On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io>
>> wrote:
>>>> 
>>>>> Just a quick thought regarding headers:
>>>>>> I think there is no absolute-safe ways to avoid conflicts, but we
>> can
>>>>> still
>>>>>> consider using some name patterns to reduce the likelihood as much
>> as
>>>>>> possible.. e.g. consider sth. like the internal topics naming: e.g.
>>>>>> "__internal_[name]"?
>>>>> 
>>>>> I think there is a safe way to avoid conflicts, since these headers
>> are
>>>>> only needed in internal topics (I think):
>>>>> For internal and changelog topics, we can namespace all headers:
>>>>> * user-defined headers are namespaced as "external." + headerKey
>>>>> * internal headers are namespaced as "internal." + headerKey
>>>>> 
>>>>> This is a lot of characters, so we could use a sigil instead (e.g.,
>> "_"
>>>> for
>>>>> internal, "~" for external)
>>>>> 
>>>>> We simply apply the namespacing when we read user headers from
>> external
>>>>> topics into the topology and then de-namespace them before we emit
>> them
>>>> to
>>>>> an external topic (via "to" or "through").
>>>>> Now, it is not possible to collide with user-defined headers.
>>>>> 
>>>>> That said, I'd also be fine with just reserving "__" as a header
>> prefix
>>>> and
>>>>> not worrying about collisions.
>>>>> 
>>>>> Thanks for the KIP,
>>>>> -John
>>>>> 
>>>>> On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak <
>> jan.filip...@trivago.com
>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Still havent completly grabbed it.
>>>>>> sorry will read more
>>>>>> 
>>>>>>> On 17.08.2018 21:23, Jan Filipiak wrote:
>>>>>>> Cool stuff.
>>>>>>> 
>>>>>>> I made some random remarks. Did not touch the core of the
>> algorithm
>>>>> yet.
>>>>>>> 
>>>>>>> Will do Monday 100%
>>>>>>> 
>>>>>>> I don't see Interactive Queries :) like that!
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On 17.08.2018 20:28, Adam Bellemare wrote:
>>>>>>>> I have submitted a PR with my code against trunk:
>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>> 
>>>>>>>> Do I continue on this thread or do we begin a new one for
>>>> discussion?
>>>>>>>> 
>>>>>>>> On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <
>>>>> jan.filip...@trivago.com
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> even before message headers, the option for me always existed
>> to
>>>>>>>>> just wrap
>>>>>>>>> the messages into my own custom envelop.
>>>>>>>>> So I of course thought this through. One sentence in your last
>>>> email
>>>>>>>>> triggered all the thought process I put in the back then
>>>>>>>>> again to design it in the, what i think is the "kafka-way". It
>>>> ended
>>>>> up
>>>>>>>>> ranting a little about what happened in the past.
>>>>>>>>> 
>>>>>>>>> I see plenty of colleagues of mine falling into traps in the
>> API,
>>>>>>>>> that I
>>>>>>>>> did warn about in the 1.0 DSL rewrite. I have the same
>>>>>>>>> feeling again. So I hope it gives you some insights into my
>>> though
>>>>>>>>> process. I am aware that since i never ported 213 to higher
>>>>>>>>> streams version, I don't really have a steak here and
>> initially I
>>>>>>>>> didn't
>>>>>>>>> feel like actually sending it. But maybe you can pull
>>>>>>>>> something good from it.
>>>>>>>>> 
>>>>>>>>>  Best jan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On 15.08.2018 04:44, Adam Bellemare wrote:
>>>>>>>>>> 
>>>>>>>>>> @Jan
>>>>>>>>>> Thanks Jan. I take it you mean "key-widening" somehow includes
>>>>>>>>>> information
>>>>>>>>>> about which record is processed first? I understand about a
>>>>>>>>>> CombinedKey
>>>>>>>>>> with both the Foreign and Primary key, but I don't see how you
>>>> track
>>>>>>>>>> ordering metadata in there unless you actually included a
>>> metadata
>>>>>>>>>> field
>>>>>>>>>> in
>>>>>>>>>> the key type as well.
>>>>>>>>>> 
>>>>>>>>>> @Guozhang
>>>>>>>>>> As Jan mentioned earlier, is Record Headers mean to strictly
>> be
>>>>>>>>>> used in
>>>>>>>>>> just the user-space? It seems that it is possible that a
>>> collision
>>>>>>>>>> on the
>>>>>>>>>> (key,value) tuple I wish to add to it could occur. For
>> instance,
>>>> if
>>>>> I
>>>>>>>>>> wanted to add a ("foreignKeyOffset",10) to the Headers but the
>>>> user
>>>>>>>>>> already
>>>>>>>>>> specified their own header with the same key name, then it
>>> appears
>>>>>>>>>> there
>>>>>>>>>> would be a collision. (This is one of the issues I brought up
>> in
>>>>>>>>>> the KIP).
>>>>>>>>>> 
>>>>>>>>>> --------------------------------
>>>>>>>>>> 
>>>>>>>>>> I will be posting a prototype PR against trunk within the next
>>> day
>>>>>>>>>> or two.
>>>>>>>>>> One thing I need to point out is that my design very strictly
>>>> wraps
>>>>>>>>>> the
>>>>>>>>>> entire foreignKeyJoin process entirely within the DSL
>> function.
>>>>>>>>>> There is
>>>>>>>>>> no
>>>>>>>>>> exposure of CombinedKeys or widened keys, nothing to resolve
>>> with
>>>>>>>>>> regards
>>>>>>>>>> to out-of-order processing and no need for the DSL user to
>> even
>>>> know
>>>>>>>>>> what's
>>>>>>>>>> going on inside of the function. The code simply returns the
>>>>>>>>>> results of
>>>>>>>>>> the
>>>>>>>>>> join, keyed by the original key. Currently my API mirrors
>>>>>>>>>> identically the
>>>>>>>>>> format of the data returned by the regular join function, and
>> I
>>>>>>>>>> believe
>>>>>>>>>> that this is very useful to many users of the DSL. It is my
>>>>>>>>>> understanding
>>>>>>>>>> that one of the main design goals of the DSL is to provide
>>> higher
>>>>>>>>>> level
>>>>>>>>>> functionality without requiring the users to know exactly
>> what's
>>>>>>>>>> going on
>>>>>>>>>> under the hood. With this in mind, I thought it best to solve
>>>>>>>>>> ordering and
>>>>>>>>>> partitioning problems within the function and eliminate the
>>>>>>>>>> requirement
>>>>>>>>>> for
>>>>>>>>>> users to do additional work after the fact to resolve the
>>> results
>>>>>>>>>> of their
>>>>>>>>>> join. Basically, I am assuming that most users of the DSL just
>>>>>>>>>> "want it to
>>>>>>>>>> work" and want it to be easy. I did this operating under the
>>>>>>>>>> assumption
>>>>>>>>>> that if a user truly wants to optimize their own workflow down
>>> to
>>>>> the
>>>>>>>>>> finest details then they will break from strictly using the
>> DSL
>>>> and
>>>>>>>>>> move
>>>>>>>>>> down to the processors API.
>>>>>>>>>> 
>>>>>>>>> I think. The abstraction is not powerful enough
>>>>>>>>> to not have kafka specifics leak up The leak I currently think
>>> this
>>>>>>>>> has is
>>>>>>>>> that you can not reliable prevent the delete coming out first,
>>>>>>>>> before you emit the correct new record. As it is an abstraction
>>>>>>>>> entirely
>>>>>>>>> around kafka.
>>>>>>>>> I can only recommend to not to. Honesty and simplicity should
>>>> always
>>>>> be
>>>>>>>>> first prio
>>>>>>>>> trying to hide this just makes it more complex, less
>>> understandable
>>>>> and
>>>>>>>>> will lead to mistakes
>>>>>>>>> in usage.
>>>>>>>>> 
>>>>>>>>> Exactly why I am also in big disfavour of GraphNodes and later
>>>>>>>>> optimization stages.
>>>>>>>>> Can someone give me an example of an optimisation that really
>>> can't
>>>>> be
>>>>>>>>> handled by the user
>>>>>>>>> constructing his topology differently?
>>>>>>>>> Having reusable Processor API components accessible by the DSL
>>> and
>>>>>>>>> composable as
>>>>>>>>> one likes is exactly where DSL should max out and KSQL should
>> do
>>>> the
>>>>>>>>> next
>>>>>>>>> step.
>>>>>>>>> I find it very unprofessional from a software engineering
>>> approach
>>>>>>>>> to run
>>>>>>>>> software where
>>>>>>>>> you can not at least senseful reason about the inner workings
>> of
>>>> the
>>>>>>>>> libraries used.
>>>>>>>>> Gives this people have to read and understand in anyway, why
>> try
>>> to
>>>>>>>>> hide
>>>>>>>>> it?
>>>>>>>>> 
>>>>>>>>> It really miss the beauty of 0.10 version DSL.
>>>>>>>>> Apparently not a thing I can influence but just warn about.
>>>>>>>>> 
>>>>>>>>> @gouzhang
>>>>>>>>> you can't imagine how many extra IQ-Statestores I constantly
>>> prune
>>>>> from
>>>>>>>>> stream app's
>>>>>>>>> because people just keep passing Materialized's into all the
>>>>>>>>> operations.
>>>>>>>>> :D :'-(
>>>>>>>>> I regret that I couldn't convince you guys back then. Plus this
>>>> whole
>>>>>>>>> entire topology as a floating
>>>>>>>>> interface chain, never seen it anywhere :-/ :'(
>>>>>>>>> 
>>>>>>>>> I don't know. I guess this is just me regretting to only have
>>>>> 24h/day.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I updated the KIP today with some points worth talking about,
>>>> should
>>>>>>>>> anyone
>>>>>>>>>> be so inclined to check it out. Currently we are running this
>>> code
>>>>> in
>>>>>>>>>> production to handle relational joins from our Kafka Connect
>>>>>>>>>> topics, as
>>>>>>>>>> per
>>>>>>>>>> the original motivation of the KIP.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I believe the foreignKeyJoin should be responsible for. In my
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<
>>> wangg...@gmail.com
>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hello Adam,
>>>>>>>>>>> As for your question regarding GraphNodes, it is for
>> extending
>>>>>>>>>>> Streams
>>>>>>>>>>> optimization framework. You can find more details on
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6761.
>>>>>>>>>>> 
>>>>>>>>>>> The main idea is that instead of directly building up the
>>>> "physical
>>>>>>>>>>> topology" (represented as Topology in the public package, and
>>>>>>>>>>> internally
>>>>>>>>>>> built as the ProcessorTopology class) while users are
>>> specifying
>>>>> the
>>>>>>>>>>> transformation operators, we first keep it as a "logical
>>>> topology"
>>>>>>>>>>> (represented as GraphNode inside InternalStreamsBuilder). And
>>>> then
>>>>>>>>>>> only
>>>>>>>>>>> execute the optimization and the construction of the
>> "physical"
>>>>>>>>>>> Topology
>>>>>>>>>>> when StreamsBuilder.build() is called.
>>>>>>>>>>> 
>>>>>>>>>>> Back to your question, I think it makes more sense to add a
>> new
>>>>>>>>>>> type of
>>>>>>>>>>> StreamsGraphNode (maybe you can consider inheriting from the
>>>>>>>>>>> BaseJoinProcessorNode). Note that although in the Topology we
>>>> will
>>>>>>>>>>> have
>>>>>>>>>>> multiple connected ProcessorNodes to represent a
>> (foreign-key)
>>>>>>>>>>> join, we
>>>>>>>>>>> still want to keep it as a single StreamsGraphNode, or just a
>>>>>>>>>>> couple of
>>>>>>>>>>> them in the logical representation so that in the future we
>> can
>>>>>>>>>>> construct
>>>>>>>>>>> the physical topology differently (e.g. having another way
>> than
>>>> the
>>>>>>>>>>> current
>>>>>>>>>>> distributed hash-join).
>>>>>>>>>>> 
>>>>>>>>>>> -------------------------------------------------------
>>>>>>>>>>> 
>>>>>>>>>>> Back to your questions to KIP-213, I think Jan has summarized
>>> it
>>>>>>>>>>> pretty-well. Note that back then we do not have headers
>> support
>>>> so
>>>>> we
>>>>>>>>>>> have
>>>>>>>>>>> to do such "key-widening" approach to ensure ordering.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan
>>>>>>>>>>> Filipiak<jan.filip...@trivago.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Adam,
>>>>>>>>>>>> I love how you are on to this already! I resolve this by
>>>>>>>>>>>> "key-widening"
>>>>>>>>>>>> I
>>>>>>>>>>>> treat the result of FKA,and FKB differently.
>>>>>>>>>>>> As you can see the output of my join has a Combined Key and
>>>>>>>>>>>> therefore I
>>>>>>>>>>>> can resolve the "race condition" in a group by
>>>>>>>>>>>> if I so desire.
>>>>>>>>>>>> 
>>>>>>>>>>>> I think this reflects more what happens under the hood and
>>> makes
>>>>>>>>>>>> it more
>>>>>>>>>>>> clear to the user what is going on. The Idea
>>>>>>>>>>>> of hiding this behind metadata and handle it in the DSL is
>>> from
>>>>>>>>>>>> my POV
>>>>>>>>>>>> unideal.
>>>>>>>>>>>> 
>>>>>>>>>>>> To write into your example:
>>>>>>>>>>>> 
>>>>>>>>>>>> key + A, null)
>>>>>>>>>>>> (key +B, <joined On FK =B>)
>>>>>>>>>>>> 
>>>>>>>>>>>> is what my output would look like.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Hope that makes sense :D
>>>>>>>>>>>> 
>>>>>>>>>>>> Best Jan
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 13.08.2018 18:16, Adam Bellemare wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>> If you do not use headers or other metadata, how do you
>>> ensure
>>>>> that
>>>>>>>>>>>>> changes
>>>>>>>>>>>>> to the foreign-key value are not resolved out-of-order?
>>>>>>>>>>>>> ie: If an event has FK = A, but you change it to FK = B,
>> you
>>>>>>>>>>>>> need to
>>>>>>>>>>>>> propagate both a delete (FK=A -> null) and an addition
>>> (FK=B).
>>>>>>>>>>>>> In my
>>>>>>>>>>>>> solution, without maintaining any metadata, it is possible
>>> for
>>>>> the
>>>>>>>>>>>>> final
>>>>>>>>>>>>> output to be in either order - the correctly updated joined
>>>>>>>>>>>>> value, or
>>>>>>>>>>>>> 
>>>>>>>>>>>> the
>>>>>>>>>>>> null for the delete.
>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> or
>>>>>>>>>>>>> 
>>>>>>>>>>>>> (key, <joined On FK =B>)
>>>>>>>>>>>>> (key, null)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I looked back through your code and through the discussion
>>>>>>>>>>>>> threads, and
>>>>>>>>>>>>> didn't see any information on how you resolved this. I
>> have a
>>>>>>>>>>>>> version
>>>>>>>>>>>>> of
>>>>>>>>>>>>> my
>>>>>>>>>>>>> code working for 2.0, I am just adding more integration
>> tests
>>>>>>>>>>>>> and will
>>>>>>>>>>>>> update the KIP accordingly. Any insight you could provide
>> on
>>>>>>>>>>>>> resolving
>>>>>>>>>>>>> out-of-order semantics without metadata would be helpful.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Adam
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
>>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Happy to see that you want to make an effort here.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regarding the ProcessSuppliers I couldn't find a way to
>> not
>>>>>>>>>>>>>> rewrite
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> joiners + the merger.
>>>>>>>>>>>>>> The re-partitioners can be reused in theory. I don't know
>> if
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> repartition
>>>>>>>>>>>> is optimized in 2.0 now.
>>>>>>>>>>>>>> I made this
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 241+
>>>>>>>>>>>>>> KTable+repartition+with+compacted+Topics
>>>>>>>>>>>>>> back then and we are running KIP-213 with KIP-241 in
>>>>> combination.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> For us it is vital as it minimized the size we had in our
>>>>>>>>>>>>>> repartition
>>>>>>>>>>>>>> topics plus it removed the factor of 2 in events on every
>>>>> message.
>>>>>>>>>>>>>> I know about this new  "delete once consumer has read it".
>>> I
>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> think
>>>>>>>>>>>> 241 is vital for all usecases, for ours it is. I wanted
>>>>>>>>>>>>>> to use 213 to sneak in the foundations for 241 aswell.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don't quite understand what a PropagationWrapper is,
>> but I
>>>> am
>>>>>>>>>>>>>> certain
>>>>>>>>>>>>>> that you do not need RecordHeaders
>>>>>>>>>>>>>> for 213 and I would try to leave them out. They either
>>> belong
>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> DSL
>>>>>>>>>>>> or to the user, having a mixed use is
>>>>>>>>>>>>>> to be avoided. We run the join with 0.8 logformat and I
>>> don't
>>>>>>>>>>>>>> think
>>>>>>>>>>>>>> one
>>>>>>>>>>>>>> needs more.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This KIP will be very valuable for the streams project! I
>>>>> couldn't
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> never
>>>>>>>>>>>> convince myself to invest into the 1.0+ DSL
>>>>>>>>>>>>>> as I used almost all my energy to fight against it. Maybe
>>> this
>>>>> can
>>>>>>>>>>>>>> also
>>>>>>>>>>>>>> help me see the good sides a little bit more.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If there is anything unclear with all the text that has
>> been
>>>>>>>>>>>>>> written,
>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>> free to just directly cc me so I don't miss it on
>>>>>>>>>>>>>> the mailing list.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> More followup, and +dev as Guozhang replied to me directly
>>>>>>>>>>>>>> previously.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am currently porting the code over to trunk. One of the
>>>> major
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> changes
>>>>>>>>>>>> since 1.0 is the usage of GraphNodes. I have a question
>> about
>>>>> this:
>>>>>>>>>>>>>>> For a foreignKey joiner, should it have its own dedicated
>>>> node
>>>>>>>>>>>>>>> type?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Or
>>>>>>>>>>>> would it be advisable to construct it from existing
>> GraphNode
>>>>>>>>>>>>>>> components?
>>>>>>>>>>>>>>> For instance, I believe I could construct it from several
>>>>>>>>>>>>>>> OptimizableRepartitionNode, some SinkNode, some
>> SourceNode,
>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> several
>>>>>>>>>>>> StatefulProcessorNode. That being said, there is some
>>> underlying
>>>>>>>>>>>>>>> complexity
>>>>>>>>>>>>>>> to each approach.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I will be switching the KIP-213 to use the RecordHeaders
>> in
>>>>> Kafka
>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>> instead of the PropagationWrapper, but conceptually it
>>> should
>>>>>>>>>>>>>>> be the
>>>>>>>>>>>>>>> same.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Again, any feedback is welcomed...
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <
>>>>>>>>>>>>>>> adam.bellem...@gmail.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Guozhang et al
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I was just reading the 2.0 release notes and noticed a
>>>> section
>>>>> on
>>>>>>>>>>>>>>>> Record
>>>>>>>>>>>>>>>> Headers.
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+
>>>>> Processor+API
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am not yet sure if the contents of a RecordHeader is
>>>>>>>>>>>>>>>> propagated
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> way through the Sinks and Sources, but if it is, and if
>> it
>>>>>>>>>>>>>>>> remains
>>>>>>>>>>>>>>>> attached
>>>>>>>>>>>>>>>> to the record (including null records) I may be able to
>>>> ditch
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> propagationWrapper for an implementation using
>>> RecordHeader.
>>>>>>>>>>>>>>>> I am
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> yet
>>>>>>>>>>>>>>>> sure if this is doable, so if anyone understands
>>>> RecordHeader
>>>>>>>>>>>>>>>> impl
>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>> than I, I would be happy to hear from you.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In the meantime, let me know of any questions. I believe
>>>> this
>>>>>>>>>>>>>>>> PR has
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a
>>>>>>>>>>>> lot
>>>>>>>>>>>>>>>> of potential to solve problems for other people, as I
>> have
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> encountered
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> number of other companies in the wild all home-brewing
>>> their
>>>>> own
>>>>>>>>>>>>>>>> solutions
>>>>>>>>>>>>>>>> to come up with a method of handling relational data in
>>>>> streams.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang
>>>>>>>>>>>>>>>> Wang<wangg...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for rebooting the discussion of this KIP ! Let me
>>>>>>>>>>>>>>>> finish my
>>>>>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>>> on the wiki and get back to you soon. Sorry for the
>>>> delays..
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
>>>>>>>>>>>>>>>>> adam.bellem...@gmail.com
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Let me kick this off with a few starting points that I
>>>>>>>>>>>>>>>>>> would like
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>> generate some discussion on.
>>>>>>>>>>>>>>>>>> 1) It seems to me that I will need to repartition the
>>> data
>>>>>>>>>>>>>>>>>> twice -
>>>>>>>>>>>>>>>>>> once
>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> the foreign key, and once back to the primary key. Is
>>>> there
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> anything
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>> missing here?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 2) I believe I will also need to materialize 3 state
>>>>>>>>>>>>>>>>>> stores: the
>>>>>>>>>>>>>>>>>> prefixScan
>>>>>>>>>>>>>>>>>> SS, the highwater mark SS (for out-of-order
>> resolution)
>>>> and
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> final
>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>> store, due to the workflow I have laid out. I have not
>>>>>>>>>>>>>>>>>> thought of
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>> way yet, but would appreciate any input on this
>> matter.
>>> I
>>>>> have
>>>>>>>>>>>>>>>>>> gone
>>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>> through the mailing list for the previous discussions
>> on
>>>>>>>>>>>>>>>>>> this KIP,
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> did not see anything relating to resolving
>> out-of-order
>>>>>>>>>>>>>>>>>> compute. I
>>>>>>>>>>>>>>>>>> cannot
>>>>>>>>>>>>>>>>>> see a way around the current three-SS structure that I
>>>> have.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 3) Caching is disabled on the prefixScan SS, as I do
>> not
>>>>>>>>>>>>>>>>>> know how
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>> resolve the iterator obtained from rocksDB with that of the
>>>> cache.
>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>> addition, I must ensure everything is flushed before
>> scanning.
>>>>>>>>>>>>>>>>> Since
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> materialized prefixScan SS is under "control" of the
>>>>>>>>>>>>>>>>>> function, I
>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> anticipate this to be a problem. Performance
>> throughput
>>>>>>>>>>>>>>>>>> will need
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> tested, but as Jan observed in his initial overview of
>>>> this
>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> generally a surge of output events which affect
>>>> performance
>>>>>>>>>>>>>>>>>> moreso
>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> flush or prefixScan itself.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thoughts on any of these are greatly appreciated,
>> since
>>>>> these
>>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> really the cornerstone of the whole design. I can put
>> up
>>>>>>>>>>>>>>>>>> the code
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> written against 1.0.2 if we so desire, but first I was
>>>>>>>>>>>>>>>>>> hoping to
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>> tackle some of the fundamental design proposals.
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
>>>>>>>>>>>>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Here is the new discussion thread for KIP-213. I
>> picked
>>>>>>>>>>>>>>>>>> back up on
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> as this is something that we too at Flipp are now
>>> running
>>>> in
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> production.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Jan started this last year, and I know that Trivago is
>>>> also
>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> similar in production, at least in terms of APIs and
>>>>>>>>>>>>>>>>>> functionality.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>> confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>> 213+Support+non-key+joining+in+KTable
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I do have an implementation of the code for Kafka
>> 1.0.2
>>>>> (our
>>>>>>>>>>>>>>>>>>> local
>>>>>>>>>>>>>>>>>>> production version) but I won't post it yet as I
>> would
>>>>>>>>>>>>>>>>>>> like to
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> focus
>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>>> workflow and design first. That being said, I also
>> need
>>> to
>>>>> add
>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> clearer
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> integration tests (I did a lot of testing using a
>>>> non-Kafka
>>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> framework) and clean up the code a bit more before
>>>> putting
>>>>>>>>>>>>>>>>>>> it in
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> PR
>>>>>>>>>>>>>>>>>>> against trunk (I can do so later this week likely).
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Please take a look,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Adam Bellemare
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 
> 
> 
> 
> -- 
> -- Guozhang

Reply via email to