[ 
https://issues.apache.org/jira/browse/KAFKA-10366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186265#comment-17186265
 ] 

Matthias J. Sax commented on KAFKA-10366:
-----------------------------------------

Following [~ableegoldman] last comment on the discuss thread of the KIP, I was 
double checking `KStreamAggregationIntegrationTest#shouldReduceWindowed()` and 
it seem the issue is in the test setup, or in particular the use helper 
methods. The method `IntegrationTestUitls#createConsumer` does only take a 
`Properties` parameter and we should extend it to also take key/value 
deserializer objects (ie, add an overload for the method – we also need more 
overloads for other methods for this case). In 
`KStreamAggregationIntegrationTest#receiveMessages` we actually have a 
`Deserializer` object on hand, but we just convert it to a config for the 
consumer instead of forwarding the reference... This should resolve the issue.

> TimeWindowedDeserializer doesn't allow users to set a custom window size
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-10366
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10366
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Leah Thomas
>            Assignee: Leah Thomas
>            Priority: Major
>              Labels: streams
>
> Related to [KAFKA-4468|https://issues.apache.org/jira/browse/KAFKA-4468], in 
> timeWindowedDeserializer Long.MAX_VALUE is used as _windowSize_ for any 
> deserializer that uses the default constructor. While streams apps can pass 
> in a window size in serdes or while creating a timeWindowedDeserializer, the 
> deserializer that is actually used in processing the messages is created by 
> the Kafka consumer, without passing in the set windowSize. The deserializer 
> the consumer creates uses the configs, but as there is no config for 
> windowSize, the window size is always default.
> See _KStreamAggregationIntegrationTest #ShouldReduceWindowed()_ as an example 
> of this issue. Despite passing in the windowSize to both the serdes and the 
> timeWindowedDeserializer, the window size is set to Long.MAX_VALUE. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to