userClicksJoinRegion is never serialized... It the result of the join and the join only (de)serializes its input in the internal stores.
The output it forwarded in-memory to a consecutive map and return `clicksByRegion` that is [String,Long]. -Matthias On 2/10/18 1:17 PM, Ted Yu wrote: > Please read the javadoc: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java > > and correlate with the sample code. > > Thanks > > On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Looking at >> https://github.com/confluentinc/kafka-streams- >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >> StreamToTableJoinScalaIntegrationTest.scala#L148, >> it seems that the leftJoin generates a KStream[String, (String, Long)], >> which means the value is a tuple of (String, Long) .. I am not able to get >> how this will serialize/de-serialize with the default serializers which are >> both stringSerde for keys and values. >> >> or am I missing something ? >> >> regards. >> >> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> If I read the code correctly, the operation on this line prepares the >> input >>> for the (stringSerde, stringSerde) specified on line 142: >>> >>> .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if >>> (region == null) "UNKNOWN" else region, clicks)) >>> >>> FYI >>> >>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh < >> ghosh.debas...@gmail.com >>>> >>> wrote: >>> >>>> Hi - >>>> >>>> I was going through this example at >>>> https://github.com/confluentinc/kafka-streams- >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>> StreamToTableJoinScalaIntegrationTest.scala, >>>> especially the leftJoin part >>>> https://github.com/confluentinc/kafka-streams- >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>> StreamToTableJoinScalaIntegrationTest.scala#L156. >>>> This leftJoin returns KStream[String, (String, Long)], while default >>>> serializers are String for both key and value as in >>>> https://github.com/confluentinc/kafka-streams- >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/ >>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113. >>>> My question is how does this serialization work here ? I mean how does >>> the >>>> tuple get serialized with the default serializers ? And leftJoin only >>> works >>>> with default serializers .. >>>> >>>> regards. >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> >
signature.asc
Description: OpenPGP digital signature