userClicksJoinRegion is never serialized...

It the result of the join and the join only (de)serializes its input in
the internal stores.

The output it forwarded in-memory to a consecutive map and return
`clicksByRegion` that is [String,Long].


-Matthias

On 2/10/18 1:17 PM, Ted Yu wrote:
> Please read the javadoc:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java
> 
> and correlate with the sample code.
> 
> Thanks
> 
> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
> 
>> Looking at
>> https://github.com/confluentinc/kafka-streams-
>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>> StreamToTableJoinScalaIntegrationTest.scala#L148,
>> it seems that the leftJoin generates a KStream[String, (String, Long)],
>> which means the value is a tuple of (String, Long) .. I am not able to get
>> how this will serialize/de-serialize with the default serializers which are
>> both stringSerde for keys and values.
>>
>> or am I missing something ?
>>
>> regards.
>>
>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> If I read the code correctly, the operation on this line prepares the
>> input
>>> for the (stringSerde, stringSerde) specified on line 142:
>>>
>>>       .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if
>>> (region == null) "UNKNOWN" else region, clicks))
>>>
>>> FYI
>>>
>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
>> ghosh.debas...@gmail.com
>>>>
>>> wrote:
>>>
>>>> Hi -
>>>>
>>>> I was going through this example at
>>>> https://github.com/confluentinc/kafka-streams-
>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>> StreamToTableJoinScalaIntegrationTest.scala,
>>>> especially the leftJoin part
>>>> https://github.com/confluentinc/kafka-streams-
>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
>>>> This leftJoin returns KStream[String, (String, Long)], while default
>>>> serializers are String for both key and value as in
>>>> https://github.com/confluentinc/kafka-streams-
>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
>>>> My question is how does this serialization work here ? I mean how does
>>> the
>>>> tuple get serialized with the default serializers ? And leftJoin only
>>> works
>>>> with default serializers ..
>>>>
>>>> regards.
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to