For stream-table-join, only the table is (de)serialized, the stream-side in only piped through and does lookups into the table.
And when reading the stream (https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L129) the Serdes from the config are overwritten by parameters passed into `#stream()` The default Serdes are used when reading/writing from/to a topic/store (including repartition or changelog) and if the operator does not overwrite the default Serdes via passed-in parameters. -Matthias On 2/10/18 10:34 PM, Debasish Ghosh wrote: > The inputs to the leftJoin are the stream with [String, Long] and the table > with [String, String]. Is the default serializer (I mean from the config) > used for [String, String] ? Then how does the [String, Long] serialization > work ? > > I guess the basic issue that I am trying to understand is how the default > serialisers (stringSerde, stringSerde) registered in config used for > serialising the inputs of leftJoin .. > > regards. > > On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> userClicksJoinRegion is never serialized... >> >> It the result of the join and the join only (de)serializes its input in >> the internal stores. >> >> The output it forwarded in-memory to a consecutive map and return >> `clicksByRegion` that is [String,Long]. >> >> >> -Matthias >> >> On 2/10/18 1:17 PM, Ted Yu wrote: >>> Please read the javadoc: >>> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java >>> >>> and correlate with the sample code. >>> >>> Thanks >>> >>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh < >> ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Looking at >>>> https://github.com/confluentinc/kafka-streams- >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>> StreamToTableJoinScalaIntegrationTest.scala#L148, >>>> it seems that the leftJoin generates a KStream[String, (String, Long)], >>>> which means the value is a tuple of (String, Long) .. I am not able to >> get >>>> how this will serialize/de-serialize with the default serializers which >> are >>>> both stringSerde for keys and values. >>>> >>>> or am I missing something ? >>>> >>>> regards. >>>> >>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> If I read the code correctly, the operation on this line prepares the >>>> input >>>>> for the (stringSerde, stringSerde) specified on line 142: >>>>> >>>>> .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if >>>>> (region == null) "UNKNOWN" else region, clicks)) >>>>> >>>>> FYI >>>>> >>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh < >>>> ghosh.debas...@gmail.com >>>>>> >>>>> wrote: >>>>> >>>>>> Hi - >>>>>> >>>>>> I was going through this example at >>>>>> https://github.com/confluentinc/kafka-streams- >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>> StreamToTableJoinScalaIntegrationTest.scala, >>>>>> especially the leftJoin part >>>>>> https://github.com/confluentinc/kafka-streams- >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156. >>>>>> This leftJoin returns KStream[String, (String, Long)], while default >>>>>> serializers are String for both key and value as in >>>>>> https://github.com/confluentinc/kafka-streams- >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113. >>>>>> My question is how does this serialization work here ? I mean how does >>>>> the >>>>>> tuple get serialized with the default serializers ? And leftJoin only >>>>> works >>>>>> with default serializers .. >>>>>> >>>>>> regards. >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> -- > Sent from my iPhone >
signature.asc
Description: OpenPGP digital signature