ah .. ok .. thanks for the clarification .. for reduce I guess the overload with Materialized does the same thing ..
regards. On Tue, Feb 13, 2018 at 2:24 PM, Damian Guy <damian....@gmail.com> wrote: > There is an overload `leftJoin(KTable, ValuJoiner, Joined)` > > Joined is where you specify the Serde for the KTable and for the resulting > type. We don't need the Serde for the stream at this point as the value has > already been deserialized. > > HTH, > Damian > > On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Regarding “has an according overload” I agree. But some operators like > > reduce and leftJoin use the serdes implicitly and from the config. So if > > the developer is not careful enough to have the default serdes correct > then > > it results in runtime error. > > > > Also one more confusion on my part is that in config we can give one > serde > > for key and value. What happens if I have 2 leftJoin in my transformation > > that needs different serdes from config. There is no overload for > leftJoin > > that allows me to provide a serde. Or am I missing something ? > > > > regards. > > > > On Tue, 13 Feb 2018 at 12:14 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Each operator that needs to use a Serde, has a an according overload > > > method that allows you to overwrite the Serde. If you don't overwrite > > > it, the operator uses the Serde from the config. > > > > > > > If one gets the default > > > >> serializer wrong then she gets run time errors in serialization / > > > >> de-serialization (ClassCastException etc.) > > > > > > Default Serde are helpful if you use a generic format like Avro > > > thoughout the whole topology. If you have many different types, it > might > > > be better to set default Serdes to `null` and set the Serde for each > > > operator individually. > > > > > > > > > -Matthias > > > > > > On 2/12/18 2:16 AM, Debasish Ghosh wrote: > > > > Thanks a lot for the clear answer. > > > > > > > > One of the concerns that I have is that it's not always obvious when > > the > > > > default serializers are used. e.g. it looks like > KGroupedStream#reduce > > > also > > > > uses the default serializer under the hood. If one gets the default > > > > serializer wrong then she gets run time errors in serialization / > > > > de-serialization (ClassCastException etc.), which are quite hard to > > track > > > > down. > > > > > > > > On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax < > > matth...@confluent.io> > > > > wrote: > > > > > > > >> For stream-table-join, only the table is (de)serialized, the > > stream-side > > > >> in only piped through and does lookups into the table. > > > >> > > > >> And when reading the stream > > > >> (https://github.com/confluentinc/kafka-streams- > > > >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > > > >> StreamToTableJoinScalaIntegrationTest.scala#L129) > > > >> the Serdes from the config are overwritten by parameters passed into > > > >> `#stream()` > > > >> > > > >> The default Serdes are used when reading/writing from/to a > topic/store > > > >> (including repartition or changelog) and if the operator does not > > > >> overwrite the default Serdes via passed-in parameters. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 2/10/18 10:34 PM, Debasish Ghosh wrote: > > > >>> The inputs to the leftJoin are the stream with [String, Long] and > the > > > >> table > > > >>> with [String, String]. Is the default serializer (I mean from the > > > config) > > > >>> used for [String, String] ? Then how does the [String, Long] > > > >> serialization > > > >>> work ? > > > >>> > > > >>> I guess the basic issue that I am trying to understand is how the > > > default > > > >>> serialisers (stringSerde, stringSerde) registered in config used > for > > > >>> serialising the inputs of leftJoin .. > > > >>> > > > >>> regards. > > > >>> > > > >>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax < > > matth...@confluent.io > > > > > > > >>> wrote: > > > >>> > > > >>>> userClicksJoinRegion is never serialized... > > > >>>> > > > >>>> It the result of the join and the join only (de)serializes its > input > > > in > > > >>>> the internal stores. > > > >>>> > > > >>>> The output it forwarded in-memory to a consecutive map and return > > > >>>> `clicksByRegion` that is [String,Long]. > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> On 2/10/18 1:17 PM, Ted Yu wrote: > > > >>>>> Please read the javadoc: > > > >>>>> > > > >>>> https://github.com/apache/kafka/blob/trunk/streams/src/ > > > >> main/java/org/apache/kafka/streams/Consumed.java > > > >>>>> > > > >>>>> and correlate with the sample code. > > > >>>>> > > > >>>>> Thanks > > > >>>>> > > > >>>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh < > > > >>>> ghosh.debas...@gmail.com> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Looking at > > > >>>>>> https://github.com/confluentinc/kafka-streams- > > > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/ > streams/ > > > >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L148, > > > >>>>>> it seems that the leftJoin generates a KStream[String, (String, > > > >> Long)], > > > >>>>>> which means the value is a tuple of (String, Long) .. I am not > > able > > > to > > > >>>> get > > > >>>>>> how this will serialize/de-serialize with the default > serializers > > > >> which > > > >>>> are > > > >>>>>> both stringSerde for keys and values. > > > >>>>>> > > > >>>>>> or am I missing something ? > > > >>>>>> > > > >>>>>> regards. > > > >>>>>> > > > >>>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> > > > wrote: > > > >>>>>> > > > >>>>>>> If I read the code correctly, the operation on this line > prepares > > > the > > > >>>>>> input > > > >>>>>>> for the (stringSerde, stringSerde) specified on line 142: > > > >>>>>>> > > > >>>>>>> .leftJoin(userRegionsTable, (clicks: Long, region: > String) > > => > > > >> (if > > > >>>>>>> (region == null) "UNKNOWN" else region, clicks)) > > > >>>>>>> > > > >>>>>>> FYI > > > >>>>>>> > > > >>>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh < > > > >>>>>> ghosh.debas...@gmail.com > > > >>>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi - > > > >>>>>>>> > > > >>>>>>>> I was going through this example at > > > >>>>>>>> https://github.com/confluentinc/kafka-streams- > > > >>>>>>>> > > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala, > > > >>>>>>>> especially the leftJoin part > > > >>>>>>>> https://github.com/confluentinc/kafka-streams- > > > >>>>>>>> > > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156. > > > >>>>>>>> This leftJoin returns KStream[String, (String, Long)], while > > > default > > > >>>>>>>> serializers are String for both key and value as in > > > >>>>>>>> https://github.com/confluentinc/kafka-streams- > > > >>>>>>>> > > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ > > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113. > > > >>>>>>>> My question is how does this serialization work here ? I mean > > how > > > >> does > > > >>>>>>> the > > > >>>>>>>> tuple get serialized with the default serializers ? And > leftJoin > > > >> only > > > >>>>>>> works > > > >>>>>>>> with default serializers .. > > > >>>>>>>> > > > >>>>>>>> regards. > > > >>>>>>>> > > > >>>>>>>> -- > > > >>>>>>>> Debasish Ghosh > > > >>>>>>>> http://manning.com/ghosh2 > > > >>>>>>>> http://manning.com/ghosh > > > >>>>>>>> > > > >>>>>>>> Twttr: @debasishg > > > >>>>>>>> Blog: http://debasishg.blogspot.com > > > >>>>>>>> Code: http://github.com/debasishg > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> -- > > > >>>>>> Debasish Ghosh > > > >>>>>> http://manning.com/ghosh2 > > > >>>>>> http://manning.com/ghosh > > > >>>>>> > > > >>>>>> Twttr: @debasishg > > > >>>>>> Blog: http://debasishg.blogspot.com > > > >>>>>> Code: http://github.com/debasishg > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> -- > > > >>> Sent from my iPhone > > > >>> > > > >> > > > >> > > > > > > > > > > > > > > -- > > Sent from my iPhone > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg