[ 
https://issues.apache.org/jira/browse/KAFKA-13024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427868#comment-17427868
 ] 

Matthias J. Sax commented on KAFKA-13024:
-----------------------------------------

While I think `repartition()` has indeed a regression compared to `through()`, 
the example of `stream().repartition().to()` does not make much sense: why 
would you call `repartition()` before `to()`? Writing into the output topic 
would write the data partitioned by key anyway.

In general, in the DSL, we would need to be smart about doing the "drop 
null-key" and "drop null-value" optimization depending on the downstream 
operators. For aggregations and joins it might make sense (but not for all 
cases -> cf https://issues.apache.org/jira/browse/KAFKA-13024) to drop upstream 
as an optimization.

I guess for a custom `transform()` or similar it might be a different story. In 
the end, we should enhance the logic when compiling from the DSL down to the 
PAPI, to determine for which cases we want to add the "drop null" optimization 
before repartitioning. We also need to consider the "fan-out" case, for which 
there are multiple downstream operators after `repartition()` and only apply 
the optimization if it applies to _all_ downstream operators.

> Kafka Streams is dropping messages with null key during repartition
> -------------------------------------------------------------------
>
>                 Key: KAFKA-13024
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13024
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Damien Gasparina
>            Priority: Major
>
> {{KStream.repartition}} is silently filtering messages with null keys. A 
> single topology like {{.stream().repartition().to()}} would filter all 
> messages with null key.
> The cause: we are adding a filter before the source & sink nodes 
> ([https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1060-L1064]).
>  It looks like we are doing that because this method is also used for 
> building KTable.
> Null key messages are valid for a KStream, it looks like a regression, the 
> previous {{.through()}} was not filtering null key messages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to