Hi Sophie,

I will have to give this a try. Thanks very much!

Gareth

On Fri, Mar 19, 2021 at 9:00 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Ah ok, I think I was envisioning a different use case from your initial
> description of the problem.
> If everything that you want to group together is already correctly
> partitioned, then you won't need
> a repartitioning step. If I understand correctly, you have something like
> this in mind:
>
>  builder
>     .stream("input-topic")
>     .selectKey((key, value) -> key + extractKeyData(value)); // eg A -->
> A:B
>     .groupByKey()
>     .aggregate(...);
>     .to("output-topic");  // keys are of the form A:B, but should be
> partitioned by A
>
> If that looks about right, then there are two things to watch out for:
> 1) To keep partitioning on A instead of A:B, you'll need to provide a
> custom Partitioner when writing
>     to the output topic. See Produced#streamPartitioner
> 2) Since you have a key-changing operation (selectKey) upstream of a
> stateful operation (aggregate),
>     Streams will automatically infer that a repartitioning step is required
> and insert one for you.
>     Unfortunately there's currently no way to force Streams not to do that,
> even when you know the
>     data is going to be partitioned correctly -- there is a ticket for this
> but it has yet to be picked up by
>     anyone. See https://issues.apache.org/jira/browse/KAFKA-4835
>     I think the only workaround at this point would be to implement the
> aggregation step yourself
>     using the low-level Processor API. Streams will only handle the
> repartitioning for DSL operators.
>
>     You can still use the DSL for everything else, and mix in the PAPI by
> using a transformer. Streams
>     does not insert repartitions before a transformer since it can't infer
> whether or not the operation is
>     stateful. I know re-implementing the aggregator is a hassle but you
> should be able to just copy and
>     paste much of the existing aggregation code. Check out the
> KStreamAggregateProcessor class.
>
>     This would look something like this:
>
>  builder
>     .stream("input-topic")
>     .selectKey((key, value) -> key + extractKeyData(value)); // eg A -->
> A:B
>     .transform(myAggregateProcessorSupplier); // supplier returns a new
> processor which implements the aggregation
>     .to("output-topic", Produced.streamPartitioner(myStreamPartitioner));
> // myStreamPartitioner extracts and partitions based on A
>
> Obviously this situation is not ideal -- if you're interested in improving
> things, feel free to pick up KAFKA-4835
> <https://issues.apache.org/jira/browse/KAFKA-4835>
>
>
> On Wed, Mar 17, 2021 at 8:19 PM Gareth Collins <gareth.o.coll...@gmail.com
> >
> wrote:
>
> > Hi Sophie,
> >
> > Thanks very much for the response!
> >
> > So if I understand correctly it will be impossible to avoid the
> repartition
> > topic?
> >
> > e.g. my original message may have key = A...and will be partitioned on A.
> >
> > But in my Kafka Streams app, I will want to aggregate on A:B or A:C or
> A:D
> > (B, C or D come from extra key values in the data)...but continue to
> > partition on A. Then later
> > read via REST all values for A. So to make this work I have to have a
> > repartition topic even though I am not really repartitioning (i.e. all
> > records for A should still be processed
> > together). Is my understanding correct?
> >
> > So WindowedStreamPartitioner is a special case for avoiding the
> repartition
> > topic?
> >
> > thanks in advance,
> > Gareth
> >
> > On Wed, Mar 17, 2021 at 7:59 PM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Hey Gareth,
> > >
> > > Kafka Streams state store partitioning is based on the partitioning of
> > the
> > > upstream input topics.
> > > If you want your RocksDB stores to be partitioned based on the prefix
> of
> > a
> > > key, then you should
> > > make sure the input topic feeding into it uses whatever partitioning
> > > strategy you had in mind.
> > >
> > > If the source topics are user input topics and you have control over
> the
> > > production to these topics,
> > > then just use a custom partitioner to produce to them. If you don't
> have
> > > control over them, you can
> > > insert an intermediate/repartition topic between the input topics and
> the
> > > subtopology with the RocksDB.
> > > Check out the KStream#repartitioned operator, it accepts a
> Repartitioned
> > > which itself accepts a
> > > StreamPartitioner that you can use to control the partitioning.
> > >
> > > You can check out the class WindowedStreamPartitioner for an example:
> > this
> > > is how we handle the
> > > WindowStore case that you pointed out.
> > >
> > >
> > >
> > > On Mon, Mar 15, 2021 at 11:45 AM Gareth Collins <
> > > gareth.o.coll...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > This may be a newbie question but is it possible to control the
> > > > partitioning of a RocksDB KeyValueStore in Kafka Streams?
> > > >
> > > > For example, I perhaps only want to partition based on a prefix of a
> > key
> > > > rather than the full key. I assume something similar must be done for
> > the
> > > > WindowStore to partition without the window start time and sequence
> > > number
> > > > (otherwise window entries could be spread across partitions)?
> > > >
> > > > Sort of like the window store, I am wanting to be able to retrieve
> all
> > > > values with a certain key prefix from the KeyValueStore with one read
> > > > operation.
> > > >
> > > > Is this possible?
> > > >
> > > > thanks in advance,
> > > > Gareth Collins
> > > >
> > >
> >
>

Reply via email to