Michal Borowiecki created KAFKA-4835:
----------------------------------------

             Summary: Allow users control over repartitioning
                 Key: KAFKA-4835
                 URL: https://issues.apache.org/jira/browse/KAFKA-4835
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Michal Borowiecki


>From 
>https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030

...it would be good to provide users more control over the repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
                KTable<String, Activity> loggedInCustomers = builder
                        .stream("customerLogins")
                        .groupBy((key, activity) -> 
                                activity.getCustomerRef())
                        .reduce((first,second) -> second, loginStore());
                
                builder
                        .stream("balanceUpdates")
                        .map((key, activity) -> new KeyValue<>(
                                activity.getCustomerRef(),
                                activity))
                        .join(loggedInCustomers, (activity, session) -> ...
                        .to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to