Ah ok, I think I was envisioning a different use case from your initial
description of the problem.
If everything that you want to group together is already correctly
partitioned, then you won't need
a repartitioning step. If I understand correctly, you have something like
this in mind:

 builder
    .stream("input-topic")
    .selectKey((key, value) -> key + extractKeyData(value)); // eg A --> A:B
    .groupByKey()
    .aggregate(...);
    .to("output-topic");  // keys are of the form A:B, but should be
partitioned by A

If that looks about right, then there are two things to watch out for:
1) To keep partitioning on A instead of A:B, you'll need to provide a
custom Partitioner when writing
    to the output topic. See Produced#streamPartitioner
2) Since you have a key-changing operation (selectKey) upstream of a
stateful operation (aggregate),
    Streams will automatically infer that a repartitioning step is required
and insert one for you.
    Unfortunately there's currently no way to force Streams not to do that,
even when you know the
    data is going to be partitioned correctly -- there is a ticket for this
but it has yet to be picked up by
    anyone. See https://issues.apache.org/jira/browse/KAFKA-4835
    I think the only workaround at this point would be to implement the
aggregation step yourself
    using the low-level Processor API. Streams will only handle the
repartitioning for DSL operators.

    You can still use the DSL for everything else, and mix in the PAPI by
using a transformer. Streams
    does not insert repartitions before a transformer since it can't infer
whether or not the operation is
    stateful. I know re-implementing the aggregator is a hassle but you
should be able to just copy and
    paste much of the existing aggregation code. Check out the
KStreamAggregateProcessor class.

    This would look something like this:

 builder
    .stream("input-topic")
    .selectKey((key, value) -> key + extractKeyData(value)); // eg A --> A:B
    .transform(myAggregateProcessorSupplier); // supplier returns a new
processor which implements the aggregation
    .to("output-topic", Produced.streamPartitioner(myStreamPartitioner));
// myStreamPartitioner extracts and partitions based on A

Obviously this situation is not ideal -- if you're interested in improving
things, feel free to pick up KAFKA-4835
<https://issues.apache.org/jira/browse/KAFKA-4835>


On Wed, Mar 17, 2021 at 8:19 PM Gareth Collins <gareth.o.coll...@gmail.com>
wrote:

> Hi Sophie,
>
> Thanks very much for the response!
>
> So if I understand correctly it will be impossible to avoid the repartition
> topic?
>
> e.g. my original message may have key = A...and will be partitioned on A.
>
> But in my Kafka Streams app, I will want to aggregate on A:B or A:C or A:D
> (B, C or D come from extra key values in the data)...but continue to
> partition on A. Then later
> read via REST all values for A. So to make this work I have to have a
> repartition topic even though I am not really repartitioning (i.e. all
> records for A should still be processed
> together). Is my understanding correct?
>
> So WindowedStreamPartitioner is a special case for avoiding the repartition
> topic?
>
> thanks in advance,
> Gareth
>
> On Wed, Mar 17, 2021 at 7:59 PM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > Hey Gareth,
> >
> > Kafka Streams state store partitioning is based on the partitioning of
> the
> > upstream input topics.
> > If you want your RocksDB stores to be partitioned based on the prefix of
> a
> > key, then you should
> > make sure the input topic feeding into it uses whatever partitioning
> > strategy you had in mind.
> >
> > If the source topics are user input topics and you have control over the
> > production to these topics,
> > then just use a custom partitioner to produce to them. If you don't have
> > control over them, you can
> > insert an intermediate/repartition topic between the input topics and the
> > subtopology with the RocksDB.
> > Check out the KStream#repartitioned operator, it accepts a Repartitioned
> > which itself accepts a
> > StreamPartitioner that you can use to control the partitioning.
> >
> > You can check out the class WindowedStreamPartitioner for an example:
> this
> > is how we handle the
> > WindowStore case that you pointed out.
> >
> >
> >
> > On Mon, Mar 15, 2021 at 11:45 AM Gareth Collins <
> > gareth.o.coll...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > This may be a newbie question but is it possible to control the
> > > partitioning of a RocksDB KeyValueStore in Kafka Streams?
> > >
> > > For example, I perhaps only want to partition based on a prefix of a
> key
> > > rather than the full key. I assume something similar must be done for
> the
> > > WindowStore to partition without the window start time and sequence
> > number
> > > (otherwise window entries could be spread across partitions)?
> > >
> > > Sort of like the window store, I am wanting to be able to retrieve all
> > > values with a certain key prefix from the KeyValueStore with one read
> > > operation.
> > >
> > > Is this possible?
> > >
> > > thanks in advance,
> > > Gareth Collins
> > >
> >
>

Reply via email to