Michal Borowiecki created KAFKA-4835: ----------------------------------------
Summary: Allow users control over repartitioning Key: KAFKA-4835 URL: https://issues.apache.org/jira/browse/KAFKA-4835 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.2.0 Reporter: Michal Borowiecki >From >https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 ...it would be good to provide users more control over the repartitioning. My use case is as follows (unrelated bits omitted for brevity): {code} KTable<String, Activity> loggedInCustomers = builder .stream("customerLogins") .groupBy((key, activity) -> activity.getCustomerRef()) .reduce((first,second) -> second, loginStore()); builder .stream("balanceUpdates") .map((key, activity) -> new KeyValue<>( activity.getCustomerRef(), activity)) .join(loggedInCustomers, (activity, session) -> ... .to("sessions"); {code} Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic. However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation). So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway. (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.) I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?) An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v6.3.15#6346)