For stream-table-join, only the table is (de)serialized, the stream-side
in only piped through and does lookups into the table.

And when reading the stream
(https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L129)
the Serdes from the config are overwritten by parameters passed into
`#stream()`

The default Serdes are used when reading/writing from/to a topic/store
(including repartition or changelog) and if the operator does not
overwrite the default Serdes via passed-in parameters.


-Matthias

On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> The inputs to the leftJoin are the stream with [String, Long] and the table
> with [String, String]. Is the default serializer (I mean from the config)
> used for [String, String] ? Then how does the [String, Long] serialization
> work ?
> 
> I guess the basic issue that I am trying to understand is how the default
> serialisers (stringSerde, stringSerde) registered in config used for
> serialising the inputs of leftJoin ..
> 
> regards.
> 
> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> userClicksJoinRegion is never serialized...
>>
>> It the result of the join and the join only (de)serializes its input in
>> the internal stores.
>>
>> The output it forwarded in-memory to a consecutive map and return
>> `clicksByRegion` that is [String,Long].
>>
>>
>> -Matthias
>>
>> On 2/10/18 1:17 PM, Ted Yu wrote:
>>> Please read the javadoc:
>>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java
>>>
>>> and correlate with the sample code.
>>>
>>> Thanks
>>>
>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
>> ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Looking at
>>>> https://github.com/confluentinc/kafka-streams-
>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>> StreamToTableJoinScalaIntegrationTest.scala#L148,
>>>> it seems that the leftJoin generates a KStream[String, (String, Long)],
>>>> which means the value is a tuple of (String, Long) .. I am not able to
>> get
>>>> how this will serialize/de-serialize with the default serializers which
>> are
>>>> both stringSerde for keys and values.
>>>>
>>>> or am I missing something ?
>>>>
>>>> regards.
>>>>
>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> If I read the code correctly, the operation on this line prepares the
>>>> input
>>>>> for the (stringSerde, stringSerde) specified on line 142:
>>>>>
>>>>>       .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if
>>>>> (region == null) "UNKNOWN" else region, clicks))
>>>>>
>>>>> FYI
>>>>>
>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
>>>> ghosh.debas...@gmail.com
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi -
>>>>>>
>>>>>> I was going through this example at
>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>> StreamToTableJoinScalaIntegrationTest.scala,
>>>>>> especially the leftJoin part
>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
>>>>>> This leftJoin returns KStream[String, (String, Long)], while default
>>>>>> serializers are String for both key and value as in
>>>>>> https://github.com/confluentinc/kafka-streams-
>>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
>>>>>> My question is how does this serialization work here ? I mean how does
>>>>> the
>>>>>> tuple get serialized with the default serializers ? And leftJoin only
>>>>> works
>>>>>> with default serializers ..
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>> --
> Sent from my iPhone
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to