Hi Sophie, I will have to give this a try. Thanks very much!
Gareth On Fri, Mar 19, 2021 at 9:00 PM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > Ah ok, I think I was envisioning a different use case from your initial > description of the problem. > If everything that you want to group together is already correctly > partitioned, then you won't need > a repartitioning step. If I understand correctly, you have something like > this in mind: > > builder > .stream("input-topic") > .selectKey((key, value) -> key + extractKeyData(value)); // eg A --> > A:B > .groupByKey() > .aggregate(...); > .to("output-topic"); // keys are of the form A:B, but should be > partitioned by A > > If that looks about right, then there are two things to watch out for: > 1) To keep partitioning on A instead of A:B, you'll need to provide a > custom Partitioner when writing > to the output topic. See Produced#streamPartitioner > 2) Since you have a key-changing operation (selectKey) upstream of a > stateful operation (aggregate), > Streams will automatically infer that a repartitioning step is required > and insert one for you. > Unfortunately there's currently no way to force Streams not to do that, > even when you know the > data is going to be partitioned correctly -- there is a ticket for this > but it has yet to be picked up by > anyone. See https://issues.apache.org/jira/browse/KAFKA-4835 > I think the only workaround at this point would be to implement the > aggregation step yourself > using the low-level Processor API. Streams will only handle the > repartitioning for DSL operators. > > You can still use the DSL for everything else, and mix in the PAPI by > using a transformer. Streams > does not insert repartitions before a transformer since it can't infer > whether or not the operation is > stateful. I know re-implementing the aggregator is a hassle but you > should be able to just copy and > paste much of the existing aggregation code. Check out the > KStreamAggregateProcessor class. > > This would look something like this: > > builder > .stream("input-topic") > .selectKey((key, value) -> key + extractKeyData(value)); // eg A --> > A:B > .transform(myAggregateProcessorSupplier); // supplier returns a new > processor which implements the aggregation > .to("output-topic", Produced.streamPartitioner(myStreamPartitioner)); > // myStreamPartitioner extracts and partitions based on A > > Obviously this situation is not ideal -- if you're interested in improving > things, feel free to pick up KAFKA-4835 > <https://issues.apache.org/jira/browse/KAFKA-4835> > > > On Wed, Mar 17, 2021 at 8:19 PM Gareth Collins <gareth.o.coll...@gmail.com > > > wrote: > > > Hi Sophie, > > > > Thanks very much for the response! > > > > So if I understand correctly it will be impossible to avoid the > repartition > > topic? > > > > e.g. my original message may have key = A...and will be partitioned on A. > > > > But in my Kafka Streams app, I will want to aggregate on A:B or A:C or > A:D > > (B, C or D come from extra key values in the data)...but continue to > > partition on A. Then later > > read via REST all values for A. So to make this work I have to have a > > repartition topic even though I am not really repartitioning (i.e. all > > records for A should still be processed > > together). Is my understanding correct? > > > > So WindowedStreamPartitioner is a special case for avoiding the > repartition > > topic? > > > > thanks in advance, > > Gareth > > > > On Wed, Mar 17, 2021 at 7:59 PM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Hey Gareth, > > > > > > Kafka Streams state store partitioning is based on the partitioning of > > the > > > upstream input topics. > > > If you want your RocksDB stores to be partitioned based on the prefix > of > > a > > > key, then you should > > > make sure the input topic feeding into it uses whatever partitioning > > > strategy you had in mind. > > > > > > If the source topics are user input topics and you have control over > the > > > production to these topics, > > > then just use a custom partitioner to produce to them. If you don't > have > > > control over them, you can > > > insert an intermediate/repartition topic between the input topics and > the > > > subtopology with the RocksDB. > > > Check out the KStream#repartitioned operator, it accepts a > Repartitioned > > > which itself accepts a > > > StreamPartitioner that you can use to control the partitioning. > > > > > > You can check out the class WindowedStreamPartitioner for an example: > > this > > > is how we handle the > > > WindowStore case that you pointed out. > > > > > > > > > > > > On Mon, Mar 15, 2021 at 11:45 AM Gareth Collins < > > > gareth.o.coll...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > This may be a newbie question but is it possible to control the > > > > partitioning of a RocksDB KeyValueStore in Kafka Streams? > > > > > > > > For example, I perhaps only want to partition based on a prefix of a > > key > > > > rather than the full key. I assume something similar must be done for > > the > > > > WindowStore to partition without the window start time and sequence > > > number > > > > (otherwise window entries could be spread across partitions)? > > > > > > > > Sort of like the window store, I am wanting to be able to retrieve > all > > > > values with a certain key prefix from the KeyValueStore with one read > > > > operation. > > > > > > > > Is this possible? > > > > > > > > thanks in advance, > > > > Gareth Collins > > > > > > > > > >