Each operator that needs to use a Serde, has a an according overload method that allows you to overwrite the Serde. If you don't overwrite it, the operator uses the Serde from the config.
> If one gets the default >> serializer wrong then she gets run time errors in serialization / >> de-serialization (ClassCastException etc.) Default Serde are helpful if you use a generic format like Avro thoughout the whole topology. If you have many different types, it might be better to set default Serdes to `null` and set the Serde for each operator individually. -Matthias On 2/12/18 2:16 AM, Debasish Ghosh wrote: > Thanks a lot for the clear answer. > > One of the concerns that I have is that it's not always obvious when the > default serializers are used. e.g. it looks like KGroupedStream#reduce also > uses the default serializer under the hood. If one gets the default > serializer wrong then she gets run time errors in serialization / > de-serialization (ClassCastException etc.), which are quite hard to track > down. > > On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> For stream-table-join, only the table is (de)serialized, the stream-side >> in only piped through and does lookups into the table. >> >> And when reading the stream >> (https://github.com/confluentinc/kafka-streams- >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >> StreamToTableJoinScalaIntegrationTest.scala#L129) >> the Serdes from the config are overwritten by parameters passed into >> `#stream()` >> >> The default Serdes are used when reading/writing from/to a topic/store >> (including repartition or changelog) and if the operator does not >> overwrite the default Serdes via passed-in parameters. >> >> >> -Matthias >> >> On 2/10/18 10:34 PM, Debasish Ghosh wrote: >>> The inputs to the leftJoin are the stream with [String, Long] and the >> table >>> with [String, String]. Is the default serializer (I mean from the config) >>> used for [String, String] ? Then how does the [String, Long] >> serialization >>> work ? >>> >>> I guess the basic issue that I am trying to understand is how the default >>> serialisers (stringSerde, stringSerde) registered in config used for >>> serialising the inputs of leftJoin .. >>> >>> regards. >>> >>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> userClicksJoinRegion is never serialized... >>>> >>>> It the result of the join and the join only (de)serializes its input in >>>> the internal stores. >>>> >>>> The output it forwarded in-memory to a consecutive map and return >>>> `clicksByRegion` that is [String,Long]. >>>> >>>> >>>> -Matthias >>>> >>>> On 2/10/18 1:17 PM, Ted Yu wrote: >>>>> Please read the javadoc: >>>>> >>>> https://github.com/apache/kafka/blob/trunk/streams/src/ >> main/java/org/apache/kafka/streams/Consumed.java >>>>> >>>>> and correlate with the sample code. >>>>> >>>>> Thanks >>>>> >>>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh < >>>> ghosh.debas...@gmail.com> >>>>> wrote: >>>>> >>>>>> Looking at >>>>>> https://github.com/confluentinc/kafka-streams- >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L148, >>>>>> it seems that the leftJoin generates a KStream[String, (String, >> Long)], >>>>>> which means the value is a tuple of (String, Long) .. I am not able to >>>> get >>>>>> how this will serialize/de-serialize with the default serializers >> which >>>> are >>>>>> both stringSerde for keys and values. >>>>>> >>>>>> or am I missing something ? >>>>>> >>>>>> regards. >>>>>> >>>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> If I read the code correctly, the operation on this line prepares the >>>>>> input >>>>>>> for the (stringSerde, stringSerde) specified on line 142: >>>>>>> >>>>>>> .leftJoin(userRegionsTable, (clicks: Long, region: String) => >> (if >>>>>>> (region == null) "UNKNOWN" else region, clicks)) >>>>>>> >>>>>>> FYI >>>>>>> >>>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com >>>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi - >>>>>>>> >>>>>>>> I was going through this example at >>>>>>>> https://github.com/confluentinc/kafka-streams- >>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala, >>>>>>>> especially the leftJoin part >>>>>>>> https://github.com/confluentinc/kafka-streams- >>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156. >>>>>>>> This leftJoin returns KStream[String, (String, Long)], while default >>>>>>>> serializers are String for both key and value as in >>>>>>>> https://github.com/confluentinc/kafka-streams- >>>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113. >>>>>>>> My question is how does this serialization work here ? I mean how >> does >>>>>>> the >>>>>>>> tuple get serialized with the default serializers ? And leftJoin >> only >>>>>>> works >>>>>>>> with default serializers .. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> -- >>> Sent from my iPhone >>> >> >> > >
signature.asc
Description: OpenPGP digital signature