Both fk1 and fk2 point to the PK of another entity (not shown for brevity, of no relevance to the question).
It this two independent FK, or one two-column FK?
Ingesting the topic into a Kafka Streams application, how can I re-key the resulting KTable<Long, A> by both fk1 and fk2?
If you read the topic a KTable, you cannot repartition because it violates the contract. A KTable must be partitioned by it's primary key, ie, the ID field, and thus the DSL does not offer you a repartition option.
You could read the topic as KStream though, and provide a custom `StreamPartitioner` for a `repartition()` operation. However, this is also "dangerous" because for a KStream it's also assumed that it's partitioned by it's key, and you might break downstream DSL operators with such a violation of the "contract".
Looking into your solution:
.toTable() .groupBy( (key, value) -> KeyValue.pair(value.fk1(), value), Grouped.with(...))
This will set fk1 as key, what seems not to align with you previous comment about the key should stay the ID? (Same for f2k).
Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's unclear what you try to actually do to begin with? It sound like it's overall a self-join of the input topic on fk1 and fk2 ?
-Matthias On 1/28/24 2:24 AM, Karsten Stöckmann wrote:
Hi all, just stumbled upon another Kafka Streams issue that keeps me busy these days. Assume a (simplified) class A like this: class A { private Long id; private String someContent; private Long fk1; private Long fk2; // Getters and setters accordingly } Both fk1 and fk2 point to the PK of another entity (not shown for brevity, of no relevance to the question). Now assume a Kafka topic built from instances of class A, keyed by its id (see above). Ingesting the topic into a Kafka Streams application, how can I re-key the resulting KTable<Long, A> by both fk1 and fk2? Note that the resulting key should not be changed or turned into some kind of composite key as it is used in later join operations. My (naive) solution involves creating two KTables from the input stream, re-keying them by fk1 and fk2 accordingly and then outer joining both resulting (re-keyed) KTables. KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...)); KTable<Long, A> rekeyedByFk1 = in .toTable() .groupBy( (key, value) -> KeyValue.pair(value.fk1(), value), Grouped.with(...)) .aggregate( Aggregate::new, (key, value, aggregate) -> aggregate.add(value), (key, value, aggregate) -> aggregate.remove(value), Materialized.with(...)); KTable<Long, a> rekeyedByFk2 = in .toTable() .groupBy( (key, value) -> KeyValue.pair(value.fk2(), value), Grouped.with(...)) .aggregate( ... same as above ); KTable<Long, A> joined = rekeyedByFk1 .outerJoin( rekeyedByFk2, <value joiner>) .groupBy(KeyValue::pair, Grouped.with(...)) .aggregate(...); <value joiner> would integrate the (already pre-joined) Aggregates as to avoid duplicates. Does this seem like a viable solution, or are there better / simpler / more efficient implementations? Best wishes, Karsten