I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for discussion?

On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> even before message headers, the option for me always existed to just wrap
> the messages into my own custom envelop.
> So I of course thought this through. One sentence in your last email
> triggered all the thought process I put in the back then
> again to design it in the, what i think is the "kafka-way". It ended up
> ranting a little about what happened in the past.
>
> I see plenty of colleagues of mine falling into traps in the API, that I
> did warn about in the 1.0 DSL rewrite. I have the same
> feeling again. So I hope it gives you some insights into my though
> process. I am aware that since i never ported 213 to higher
> streams version, I don't really have a steak here and initially I didn't
> feel like actually sending it. But maybe you can pull
> something good from it.
>
>  Best jan
>
>
>
> On 15.08.2018 04:44, Adam Bellemare wrote:
>
>> @Jan
>> Thanks Jan. I take it you mean "key-widening" somehow includes information
>> about which record is processed first? I understand about a CombinedKey
>> with both the Foreign and Primary key, but I don't see how you track
>> ordering metadata in there unless you actually included a metadata field
>> in
>> the key type as well.
>>
>> @Guozhang
>> As Jan mentioned earlier, is Record Headers mean to strictly be used in
>> just the user-space? It seems that it is possible that a collision on the
>> (key,value) tuple I wish to add to it could occur. For instance, if I
>> wanted to add a ("foreignKeyOffset",10) to the Headers but the user
>> already
>> specified their own header with the same key name, then it appears there
>> would be a collision. (This is one of the issues I brought up in the KIP).
>>
>> --------------------------------
>>
>> I will be posting a prototype PR against trunk within the next day or two.
>> One thing I need to point out is that my design very strictly wraps the
>> entire foreignKeyJoin process entirely within the DSL function. There is
>> no
>> exposure of CombinedKeys or widened keys, nothing to resolve with regards
>> to out-of-order processing and no need for the DSL user to even know
>> what's
>> going on inside of the function. The code simply returns the results of
>> the
>> join, keyed by the original key. Currently my API mirrors identically the
>> format of the data returned by the regular join function, and I believe
>> that this is very useful to many users of the DSL. It is my understanding
>> that one of the main design goals of the DSL is to provide higher level
>> functionality without requiring the users to know exactly what's going on
>> under the hood. With this in mind, I thought it best to solve ordering and
>> partitioning problems within the function and eliminate the requirement
>> for
>> users to do additional work after the fact to resolve the results of their
>> join. Basically, I am assuming that most users of the DSL just "want it to
>> work" and want it to be easy. I did this operating under the assumption
>> that if a user truly wants to optimize their own workflow down to the
>> finest details then they will break from strictly using the DSL and move
>> down to the processors API.
>>
> I think. The abstraction is not powerful enough
> to not have kafka specifics leak up The leak I currently think this has is
> that you can not reliable prevent the delete coming out first,
> before you emit the correct new record. As it is an abstraction entirely
> around kafka.
> I can only recommend to not to. Honesty and simplicity should always be
> first prio
> trying to hide this just makes it more complex, less understandable and
> will lead to mistakes
> in usage.
>
> Exactly why I am also in big disfavour of GraphNodes and later
> optimization stages.
> Can someone give me an example of an optimisation that really can't be
> handled by the user
> constructing his topology differently?
> Having reusable Processor API components accessible by the DSL and
> composable as
> one likes is exactly where DSL should max out and KSQL should do the next
> step.
> I find it very unprofessional from a software engineering approach to run
> software where
> you can not at least senseful reason about the inner workings of the
> libraries used.
> Gives this people have to read and understand in anyway, why try to hide
> it?
>
> It really miss the beauty of 0.10 version DSL.
> Apparently not a thing I can influence but just warn about.
>
> @gouzhang
> you can't imagine how many extra IQ-Statestores I constantly prune from
> stream app's
> because people just keep passing Materialized's into all the operations.
> :D :'-(
> I regret that I couldn't convince you guys back then. Plus this whole
> entire topology as a floating
> interface chain, never seen it anywhere :-/ :'(
>
> I don't know. I guess this is just me regretting to only have 24h/day.
>
>
>
> I updated the KIP today with some points worth talking about, should anyone
>> be so inclined to check it out. Currently we are running this code in
>> production to handle relational joins from our Kafka Connect topics, as
>> per
>> the original motivation of the KIP.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> I believe the foreignKeyJoin should be responsible for. In my
>>
>>
>>
>> On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<wangg...@gmail.com>
>> wrote:
>>
>> Hello Adam,
>>>
>>> As for your question regarding GraphNodes, it is for extending Streams
>>> optimization framework. You can find more details on
>>> https://issues.apache.org/jira/browse/KAFKA-6761.
>>>
>>> The main idea is that instead of directly building up the "physical
>>> topology" (represented as Topology in the public package, and internally
>>> built as the ProcessorTopology class) while users are specifying the
>>> transformation operators, we first keep it as a "logical topology"
>>> (represented as GraphNode inside InternalStreamsBuilder). And then only
>>> execute the optimization and the construction of the "physical" Topology
>>> when StreamsBuilder.build() is called.
>>>
>>> Back to your question, I think it makes more sense to add a new type of
>>> StreamsGraphNode (maybe you can consider inheriting from the
>>> BaseJoinProcessorNode). Note that although in the Topology we will have
>>> multiple connected ProcessorNodes to represent a (foreign-key) join, we
>>> still want to keep it as a single StreamsGraphNode, or just a couple of
>>> them in the logical representation so that in the future we can construct
>>> the physical topology differently (e.g. having another way than the
>>> current
>>> distributed hash-join).
>>>
>>> -------------------------------------------------------
>>>
>>> Back to your questions to KIP-213, I think Jan has summarized it
>>> pretty-well. Note that back then we do not have headers support so we
>>> have
>>> to do such "key-widening" approach to ensure ordering.
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak<jan.filip...@trivago.com>
>>> wrote:
>>>
>>> Hi Adam,
>>>>
>>>> I love how you are on to this already! I resolve this by "key-widening"
>>>> I
>>>> treat the result of FKA,and FKB differently.
>>>> As you can see the output of my join has a Combined Key and therefore I
>>>> can resolve the "race condition" in a group by
>>>> if I so desire.
>>>>
>>>> I think this reflects more what happens under the hood and makes it more
>>>> clear to the user what is going on. The Idea
>>>> of hiding this behind metadata and handle it in the DSL is from my POV
>>>> unideal.
>>>>
>>>> To write into your example:
>>>>
>>>> key + A, null)
>>>> (key +B, <joined On FK =B>)
>>>>
>>>> is what my output would look like.
>>>>
>>>>
>>>> Hope that makes sense :D
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 13.08.2018 18:16, Adam Bellemare wrote:
>>>>
>>>> Hi Jan
>>>>>
>>>>> If you do not use headers or other metadata, how do you ensure that
>>>>> changes
>>>>> to the foreign-key value are not resolved out-of-order?
>>>>> ie: If an event has FK = A, but you change it to FK = B, you need to
>>>>> propagate both a delete (FK=A -> null) and an addition (FK=B). In my
>>>>> solution, without maintaining any metadata, it is possible for the
>>>>> final
>>>>> output to be in either order - the correctly updated joined value, or
>>>>>
>>>> the
>>>
>>>> null for the delete.
>>>>>
>>>>> (key, null)
>>>>> (key, <joined On FK =B>)
>>>>>
>>>>> or
>>>>>
>>>>> (key, <joined On FK =B>)
>>>>> (key, null)
>>>>>
>>>>> I looked back through your code and through the discussion threads, and
>>>>> didn't see any information on how you resolved this. I have a version
>>>>> of
>>>>> my
>>>>> code working for 2.0, I am just adding more integration tests and will
>>>>> update the KIP accordingly. Any insight you could provide on resolving
>>>>> out-of-order semantics without metadata would be helpful.
>>>>>
>>>>> Thanks
>>>>> Adam
>>>>>
>>>>>
>>>>> On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
>>>>> jan.filip...@trivago.com
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>> Happy to see that you want to make an effort here.
>>>>>>
>>>>>> Regarding the ProcessSuppliers I couldn't find a way to not rewrite
>>>>>> the
>>>>>> joiners + the merger.
>>>>>> The re-partitioners can be reused in theory. I don't know if
>>>>>>
>>>>> repartition
>>>
>>>> is optimized in 2.0 now.
>>>>>>
>>>>>> I made this
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+
>>>>>> KTable+repartition+with+compacted+Topics
>>>>>> back then and we are running KIP-213 with KIP-241 in combination.
>>>>>>
>>>>>> For us it is vital as it minimized the size we had in our repartition
>>>>>> topics plus it removed the factor of 2 in events on every message.
>>>>>> I know about this new  "delete once consumer has read it".  I don't
>>>>>>
>>>>> think
>>>
>>>> 241 is vital for all usecases, for ours it is. I wanted
>>>>>> to use 213 to sneak in the foundations for 241 aswell.
>>>>>>
>>>>>> I don't quite understand what a PropagationWrapper is, but I am
>>>>>> certain
>>>>>> that you do not need RecordHeaders
>>>>>> for 213 and I would try to leave them out. They either belong to the
>>>>>>
>>>>> DSL
>>>
>>>> or to the user, having a mixed use is
>>>>>> to be avoided. We run the join with 0.8 logformat and I don't think
>>>>>> one
>>>>>> needs more.
>>>>>>
>>>>>> This KIP will be very valuable for the streams project! I couldn't
>>>>>>
>>>>> never
>>>
>>>> convince myself to invest into the 1.0+ DSL
>>>>>> as I used almost all my energy to fight against it. Maybe this can
>>>>>> also
>>>>>> help me see the good sides a little bit more.
>>>>>>
>>>>>> If there is anything unclear with all the text that has been written,
>>>>>> feel
>>>>>> free to just directly cc me so I don't miss it on
>>>>>> the mailing list.
>>>>>>
>>>>>> Best Jan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 08.08.2018 15:26, Adam Bellemare wrote:
>>>>>>
>>>>>> More followup, and +dev as Guozhang replied to me directly previously.
>>>>>>
>>>>>>> I am currently porting the code over to trunk. One of the major
>>>>>>>
>>>>>> changes
>>>
>>>> since 1.0 is the usage of GraphNodes. I have a question about this:
>>>>>>>
>>>>>>> For a foreignKey joiner, should it have its own dedicated node type?
>>>>>>>
>>>>>> Or
>>>
>>>> would it be advisable to construct it from existing GraphNode
>>>>>>> components?
>>>>>>> For instance, I believe I could construct it from several
>>>>>>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and
>>>>>>>
>>>>>> several
>>>
>>>> StatefulProcessorNode. That being said, there is some underlying
>>>>>>> complexity
>>>>>>> to each approach.
>>>>>>>
>>>>>>> I will be switching the KIP-213 to use the RecordHeaders in Kafka
>>>>>>> Streams
>>>>>>> instead of the PropagationWrapper, but conceptually it should be the
>>>>>>> same.
>>>>>>>
>>>>>>> Again, any feedback is welcomed...
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <
>>>>>>> adam.bellem...@gmail.com
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Guozhang et al
>>>>>>>
>>>>>>> I was just reading the 2.0 release notes and noticed a section on
>>>>>>>> Record
>>>>>>>> Headers.
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API
>>>>>>>>
>>>>>>>> I am not yet sure if the contents of a RecordHeader is propagated
>>>>>>>> all
>>>>>>>> the
>>>>>>>> way through the Sinks and Sources, but if it is, and if it remains
>>>>>>>> attached
>>>>>>>> to the record (including null records) I may be able to ditch the
>>>>>>>> propagationWrapper for an implementation using RecordHeader. I am
>>>>>>>> not
>>>>>>>> yet
>>>>>>>> sure if this is doable, so if anyone understands RecordHeader impl
>>>>>>>> better
>>>>>>>> than I, I would be happy to hear from you.
>>>>>>>>
>>>>>>>> In the meantime, let me know of any questions. I believe this PR has
>>>>>>>>
>>>>>>> a
>>>
>>>> lot
>>>>>>>> of potential to solve problems for other people, as I have
>>>>>>>>
>>>>>>> encountered
>>>
>>>> a
>>>>>>>> number of other companies in the wild all home-brewing their own
>>>>>>>> solutions
>>>>>>>> to come up with a method of handling relational data in streams.
>>>>>>>>
>>>>>>>> Adam
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang<wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hello Adam,
>>>>>>>>
>>>>>>>> Thanks for rebooting the discussion of this KIP ! Let me finish my
>>>>>>>>> pass
>>>>>>>>> on the wiki and get back to you soon. Sorry for the delays..
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
>>>>>>>>> adam.bellem...@gmail.com
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Let me kick this off with a few starting points that I would like
>>>>>>>>>>
>>>>>>>>> to
>>>
>>>> generate some discussion on.
>>>>>>>>>>
>>>>>>>>>> 1) It seems to me that I will need to repartition the data twice -
>>>>>>>>>> once
>>>>>>>>>> on
>>>>>>>>>> the foreign key, and once back to the primary key. Is there
>>>>>>>>>>
>>>>>>>>> anything
>>>
>>>> I
>>>>>>>>>> am
>>>>>>>>>> missing here?
>>>>>>>>>>
>>>>>>>>>> 2) I believe I will also need to materialize 3 state stores: the
>>>>>>>>>> prefixScan
>>>>>>>>>> SS, the highwater mark SS (for out-of-order resolution) and the
>>>>>>>>>>
>>>>>>>>> final
>>>
>>>> state
>>>>>>>>>> store, due to the workflow I have laid out. I have not thought of
>>>>>>>>>> a
>>>>>>>>>> better
>>>>>>>>>> way yet, but would appreciate any input on this matter. I have
>>>>>>>>>> gone
>>>>>>>>>> back
>>>>>>>>>> through the mailing list for the previous discussions on this KIP,
>>>>>>>>>> and
>>>>>>>>>> I
>>>>>>>>>> did not see anything relating to resolving out-of-order compute. I
>>>>>>>>>> cannot
>>>>>>>>>> see a way around the current three-SS structure that I have.
>>>>>>>>>>
>>>>>>>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how
>>>>>>>>>>
>>>>>>>>> to
>>>
>>>> resolve the iterator obtained from rocksDB with that of the cache.
>>>>>>>>>>
>>>>>>>>> In
>>>
>>>> addition, I must ensure everything is flushed before scanning.
>>>>>>>>>>
>>>>>>>>> Since
>>>
>>>> the
>>>>>>>>>> materialized prefixScan SS is under "control" of the function, I
>>>>>>>>>> do
>>>>>>>>>> not
>>>>>>>>>> anticipate this to be a problem. Performance throughput will need
>>>>>>>>>>
>>>>>>>>> to
>>>
>>>> be
>>>>>>>>>> tested, but as Jan observed in his initial overview of this issue,
>>>>>>>>>>
>>>>>>>>> it
>>>
>>>> is
>>>>>>>>>> generally a surge of output events which affect performance moreso
>>>>>>>>>> than
>>>>>>>>>> the
>>>>>>>>>> flush or prefixScan itself.
>>>>>>>>>>
>>>>>>>>>> Thoughts on any of these are greatly appreciated, since these
>>>>>>>>>> elements
>>>>>>>>>> are
>>>>>>>>>> really the cornerstone of the whole design. I can put up the code
>>>>>>>>>> I
>>>>>>>>>> have
>>>>>>>>>> written against 1.0.2 if we so desire, but first I was hoping to
>>>>>>>>>>
>>>>>>>>> just
>>>
>>>> tackle some of the fundamental design proposals.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Adam
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
>>>>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Here is the new discussion thread for KIP-213. I picked back up on
>>>>>>>>>> the
>>>>>>>>>> KIP
>>>>>>>>>>
>>>>>>>>>> as this is something that we too at Flipp are now running in
>>>>>>>>>>
>>>>>>>>>>> production.
>>>>>>>>>>>
>>>>>>>>>> Jan started this last year, and I know that Trivago is also using
>>>>>>>>>>
>>>>>>>>>>> something
>>>>>>>>>>>
>>>>>>>>>> similar in production, at least in terms of APIs and
>>>>>>>>>> functionality.
>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 213+Support+non-key+joining+in+KTable
>>>>>>>>>>>
>>>>>>>>>>> I do have an implementation of the code for Kafka 1.0.2 (our
>>>>>>>>>>> local
>>>>>>>>>>> production version) but I won't post it yet as I would like to
>>>>>>>>>>>
>>>>>>>>>> focus
>>>
>>>> on the
>>>>>>>>>>>
>>>>>>>>>> workflow and design first. That being said, I also need to add
>>>>>>>>>> some
>>>>>>>>>>
>>>>>>>>>>> clearer
>>>>>>>>>>>
>>>>>>>>>> integration tests (I did a lot of testing using a non-Kafka
>>>>>>>>>> Streams
>>>>>>>>>>
>>>>>>>>>>> framework) and clean up the code a bit more before putting it in
>>>>>>>>>>> a
>>>>>>>>>>> PR
>>>>>>>>>>> against trunk (I can do so later this week likely).
>>>>>>>>>>>
>>>>>>>>>>> Please take a look,
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> Adam Bellemare
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>> -- Guozhang
>>>
>>>
>

Reply via email to