Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
    private Long id;
    private String someContent;
    private Long fk1;
    private Long fk2;
    // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable<Long, A> by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...));

KTable<Long, A> rekeyedByFk1 = in
    .toTable()
    .groupBy(
        (key, value) -> KeyValue.pair(value.fk1(), value),
        Grouped.with(...))
    .aggregate(
        Aggregate::new,
        (key, value, aggregate) -> aggregate.add(value),
        (key, value, aggregate) -> aggregate.remove(value),
        Materialized.with(...));

KTable<Long, a> rekeyedByFk2 = in
    .toTable()
    .groupBy(
        (key, value) -> KeyValue.pair(value.fk2(), value),
        Grouped.with(...))
    .aggregate(
        ... same as above
    );

KTable<Long, A> joined = rekeyedByFk1
    .outerJoin(
        rekeyedByFk2,
        <value joiner>)
      .groupBy(KeyValue::pair, Grouped.with(...))
    .aggregate(...);

<value joiner> would integrate the (already pre-joined) Aggregates as
to avoid duplicates.

Does this seem like a viable solution, or are there better / simpler /
more efficient implementations?

Best wishes,
Karsten

Reply via email to