Hi Matthias,

2 things that pop into my mind sunday morning. Can we provide an KTableValueGetter when key in the store is different from the key forwarded?
1. we would need a backwards mapper
2. I am not sure if we can pull it of w/o said forth generic type in KTable (that I am in favour of btw)

+ It won't solves peoples problem having CombinedKey on the wire and not beeing able to inspect the topic with say there default tools. - Id rather introduce KTable::mapKeys() or something (4th generic in Ktable?) than overloading. It is better SOCs wise.

I am thinking more into an overload where we replace the Comined key Serde. So people can use a default CombinedKey Serde but could provide an own implementation that would internally use K0 vor serialisation and deserialisation. One could implement a ##prefix() into this call to make explicit that we only want the prefix rendered. This would take CombinedKey logic out of publicly visible data. A Stock CombinedKey Serde that would be used by default could also handle the JSON users correctly.

Users would still get CombinedKey back. The downside of getting these nested deeply is probably mitgated by users doing a group by
in the very next step to get rid of A's key again.

That is what I was able to come up with so far.
Let me know. what you think




On 22.11.2017 00:14, Matthias J. Sax wrote:
Jan,

Thanks for explaining the Serde issue! This makes a lot of sense.

I discussed with Guozhang about this issue and came up with the
following idea that bridges both APIs:

We still introduce CombinedKey as a public interface and exploit it to
manage the key in the store and the changelog topic. For this case we
can construct a suitable Serde internally based on the Serdes of both
keys that are combined.

However, the type of the result table is user defined and can be
anything. To bridge between the CombinedKey and the user defined result
type, users need to hand in a `ValueMapper<CombinedKey, KO>` that
convert the CombinedKey into the desired result type.

Thus, the method signature would be something like

<KO, VO, K1, V1> KTable<KO,VO> oneToManyJoin(>     KTable<K1, V1> other,
     ValueMapper<V1, K> keyExtractor,>     ValueJoiner<V, V1, VO> joiner,
     ValueMapper<CombinedKey<K,K1>, KO> resultKeyMapper);
The interface parameters are still easy to understand and don't leak
implementation details IMHO.

WDYT about this idea?


-Matthias


On 11/19/17 11:28 AM, Guozhang Wang wrote:
Hello Jan,

I think I get your point about the cumbersome that CombinedKey would
introduce for serialization and tooling based on serdes. What I'm still
wondering is the underlying of joinPrefixFakers mapper: from your latest
comment it seems this mapper will be a one-time mapper: we use this to map
the original resulted KTable<combined<K1, K2>, V0> to KTable<K0, V0> and
then that mapper can be thrown away and be forgotten. Is that true? My
original thought is that you propose to carry this mapper all the way along
the rest of the topology to "abstract" the underlying combined keys.

If it is the other way (i.e. the former approach), then the diagram of
these two approaches would be different: for the less intrusive approach we
would add one more step in this diagram to always do a mapping after the
"task perform join" block.

Also another minor comment on the internal topic: I think many readers may
not get the schema of this topic, so it is better to indicate that what
would be the key of this internal topic used for compaction, and what would
be used as the partition-key.

Guozhang


On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


I'm probably blind for this. can you help me here? how would you formulate
this?

Thanks,

Jan


On 16.11.2017 23:18, Matthias J. Sax wrote:

Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

   1) the oneToManyJoin() method had less parameter
   2) those parameters are easy to understand
   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
   4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both internally to do
the rest. This seems to be a way simpler API. With the KO output type
approach, users need to write an entirely new serde for KO in contrast.

Finally, @Jan, there are still some open comments you did not address
and the KIP wiki page needs some updates. Would be great if you could do
this.

Can you also explicitly describe the data layout of the store that is
used to do the range scans?

Additionally:

-> some arrows in the algorithm diagram are missing
-> was are those XXX in the diagram
-> can you finish the "Step by Step" example
-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


Last but not least:

But noone is really interested.
Don't understand this statement...



-Matthias


On 11/16/17 9:05 AM, Jan Filipiak wrote:

We are running this perfectly fine. for us the smaller table changes
rather infrequent say. only a few times per day. The performance of the
flush is way lower than the computing power you need to bring to the
table to account for all the records beeing emmited after the one single
update.

On 16.11.2017 18:02, Trevor Huey wrote:

Ah, I think I see the problem now. Thanks for the explanation. That is
tricky. As you said, it seems the easiest solution would just be to
flush the cache. I wonder how big of a performance hit that'd be...

On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak <jan.filip...@trivago.com
<mailto:jan.filip...@trivago.com>> wrote:

      Hi Trevor,

      I am leaning towards the less intrusive approach myself. Infact
      that is how we implemented our Internal API for this and how we
      run it in production.
      getting more voices towards this solution makes me really happy.
      The reason its a problem for Prefix and not for Range is the
      following. Imagine the intrusive approach. They key of the RockDB
      would be CombinedKey<A,B> and the prefix scan would take an A, and
      the range scan would take an CombinedKey<A,B> still. As you can
      see with the intrusive approach the keys are actually different
      types for different queries. With the less intrusive apporach we
      use the same type and rely on Serde Invariances. For us this works
      nice (protobuf) might bite some JSON users.

      Hope it makes it clear

      Best Jan


      On 16.11.2017 16:39, Trevor Huey wrote:

      1. Going over KIP-213, I am leaning toward the "less intrusive"
      approach. In my use case, I am planning on performing a sequence
      of several oneToMany joins, From my understanding, the more
      intrusive approach would result in several nested levels of
      CombinedKey's. For example, consider Tables A, B, C, D with
      corresponding keys KA, KB, KC. Joining A and B would produce
      CombinedKey<KA, KB>. Then joining that result on C would produce
      CombinedKey<KC, CombinedKey<KA, KB>>. My "keyOtherSerde" in this
      case would need to be capable of deserializing CombinedKey<KA,
      KB>. This would just get worse the more tables I join. I realize
      that it's easier to shoot yourself in the foot with the less
      intrusive approach, but as you said, " the user can stick with
      his default serde or his standard way of serializing". In the
      simplest case where the keys are just strings, they can do simple
      string concatenation and Serdes.String(). It also allows the user
      to create and use their own version of CombinedKey if they feel
      so inclined.

      2. Why is there a problem for prefix, but not for range?
     https://github.com/apache/kafka/pull/3720/files#diff-8f863b7
4c3c5a0b989e89d00c149aef1L162



      On Thu, Nov 16, 2017 at 2:57 AM Jan Filipiak
      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
wrote:

          Hi Trevor,

          thank you very much for your interested. Too keep discussion
          mailing list focused and not Jira or Confluence I decided to
          reply here.

          1. its tricky activity is indeed very low. In the KIP-213
          there are 2 proposals about the return type of the join. I
          would like to settle on one.
          Unfortunatly its controversal and I don't want to have the
          discussion after I settled on one way and implemented it. But
          noone is really interested.
          So discussing with YOU, what your preferred return type would
          look would be very helpfull already.

          2.
          The most difficult part is implementing
          this
         https://github.com/apache/kafka/pull/3720/files#diff-ac41b4d
fb9fc6bb707d966477317783cR68

          here
         https://github.com/apache/kafka/pull/3720/files#diff-8f863b7
4c3c5a0b989e89d00c149aef1R244

          and here
         https://github.com/apache/kafka/pull/3720/files#diff-b1a1281
dce5219fd0cb5afad380d9438R207

          One can get an easy shot by just flushing the underlying
          rocks and using Rocks for range scan.
          But as you can see the implementation depends on the API. For
          wich way the API discussion goes
          I would implement this differently.

          3.
          I only have so and so much time to work on this. I filed the
          KIP because I want to pull it through and I am pretty
          confident that I can do it.
          But I am still waiting for the full discussion to happen on
          this. To get the discussion forward it seems to be that I
          need to fill out the table in
          the KIP entirly (the one describing the events, change
          modifications and output). Feel free to continue the
          discussion w/o the table. I want
          to finish the table during next week.

          Best Jan thank you for your interest!

          _____ Jira Quote ______

          Jan Filipiak
         <https://issues.apache.org/jira/secure/ViewProfile.jspa?name
=jfilipiak>
          Please bear with me while I try to get caught up. I'm not yet
          familiar with the Kafka code base. I have a few questions to
          try to figure out how I can get involved:
          1. It seems like we need to get buy-in on your KIP-213? It
          doesn't seem like there's been much activity on it besides
          yourself in a while. What's your current plan of attack for
          getting that approved?
          2. I know you said that the most difficult part is yet to be
          done. Is there some code you can point me toward so I can
          start digging in and better understand why this is so
difficult?
          3. This issue has been open since May '16. How far out do you
          think we are from getting this implemented?




Reply via email to