Hi Dharin,

thanks so much for getting back and for your suggestions.

At the moment I'm not quite sure if aggregation in our database is a viable
option. Creating aggregate views seemed like an obvious solution at first,
yet Debezium does not support subscribing to publications based on views.
We've discussed various scenarios internally and came to the conclusion
that we wanted table-driven change management (as opposed to e.g. sending
changes directly from our O/R layer [Hibernate], which would circumvent
direct database changes via psql), so we settled on using Debezium as
change capture system.

Exchanging KTables for KStreams does not seem to be an option. I've
experimented with co-grouped KStreams which seemed nice at first, but at a
closer look the results did not resemble expected output. In fact,
ingesting records with <null> values in KStream has a different semantic
compared to that in KTable, which renders KStream unusable, as Debezium
sends DELETE events as key/<null> pairs. Those do not reflect in KStreams
(in fact, those records are ignored).

We are generally speaking of roughly 20 tables involved in CDC,
constructing two different kinds of aggregate objects. The largest
'leading' table features around 80M records. I'm not yet familiar with size
and performance requirements in Kafka as we are still somewhere at the
beginning of implementing our indexing solution. Initial Debezium snapshots
were quite fast from my point of view, resulting in overall broker disk
usage of 35Gi on 3 replicas each. The intended Kafka Streams application is
based on Quarkus and its Streams extension. To my knowledge, it uses
RocksDB internally. Concerning State Store management - I haven't applied
any special configuration yet so I guess there are some sane defaults. To
my knowledge, they are (partly?) managed in Kafka itself. But that's one
thing to tweak later I guess.

Best wishes,
Karsten


Am Mi., 24. Jan. 2024 um 16:51 Uhr schrieb Dharin Shah <
dharinsha...@gmail.com>:

> Hi Karsten,
>
> Before delving deeper into Kafka Streams, it's worth considering if direct
> aggregation in the database might be a more straightforward solution,
> unless there's a compelling reason to avoid it. Aggregating data at the
> database level often leads to more efficient and maintainable systems. This
> approach could involve creating intermediate tables for aggregation,
> simplifying data management and reducing the complexity of downstream
> processing.
>
> If the data model allows, consider pre-aggregating or restructuring data
> during writes. This step can significantly streamline the data before it's
> ingested into Kafka, which can then be used primarily for CDC and
> notifications.
>
> However, if Kafka Streams aligns better with your infrastructure and
> requirements, a few more details would be helpful for a tailored
> suggestion. For instance, are you using RocksDB for state stores, and how
> are these states managed - are they backed up on local disks or in Kafka
> itself?
>
> In Kafka Streams, consider converting KTables to KStreams for
> communications or labels to minimize state management needs. Keep in mind,
> though, that this approach might not reflect customer updates immediately.
>
> Also, exploring the Processor API could offer more control over data flow
> and state management. It's a bit more complex but can be very effective, as
> discussed in this
> <
> https://medium.com/@dharinshah1234/the-hidden-complexity-of-kafka-streams-dsl-solving-scaling-issues-8620fd344a6d
> >
> article.
>
> Thanks,
> Dharin
>
> On Wed, Jan 24, 2024 at 12:55 PM Karsten Stöckmann <
> karsten.stoeckm...@gmail.com> wrote:
> >
> > Hi,
> >
> > we're currently in the process of evaluating Debezium and Kafka as a CDC
> > system for our Postgres database in order to build an indexing solution
> > (i.e. Solr or OpenSearch).
> >
> > Debezium captures changes per table and propagates them into dedicated
> > Kafka topics each. The ingested tables originally feature multiple
> > relationships of all kinds (1:1, 1:n, n:m) - aggregated data from those
> > tables should eventually reflect in composite documents.
> > To illustrate this, assume the following heavily simplified table layout:
> >
> >    - customer(id, name, firstname),
> >    - communication(id, customer_id, value),
> >    - label(id, customer_id, value),
> >    - (maybe more dependent tables related to customers 1:n or 1:1 or even
> >    n:m - not considered here for brevity).
> >
> > All tables are streamed into Kafka topics as pointed out above. Now in
> > order to aggregate an output customer with all associated communication
> and
> > label values (i.e. aggregated as lists), what would be the most elegant
> > solution leveraging Kafka Streams? Note that customers do not
> necessisarily
> > have any communication or label at all, thus non-key joins are out of the
> > game as far as I understand.
> >
> > Our initial (naive) solution was to re-key the dependent KTables and then
> > left joining, but that involves a lot of steps and intermediate State
> > Stores, especially when considering the stated example is heavily
> > simplified:
> >
> > KTable<Long, Customer> customers = streamBuilder.table(customerTopic,
> > Consumed.with(<Serdes>));
> > KTable<Long, Comm> comm = streamBuilder.table(commTopic,
> > Consumed.with(<Serdes>));
> > KTable<Long, Label> labels = streamBuilder.table(labelTopic,
> > Consumed.with(<Serdes>));
> > // Re-Keying
> > KTable<Long, GroupedComm> groupedComm =
> > communications.groupBy(...).aggregate(...);
> > KTable<Long, GroupedLabel> groupedLabels =
> > labels.groupBy(...).aggregate(...);
> > // Join
> > KTable<Long, AggregateCustomer> aggregated = customers
> > .leftJoin(groupedComm, ...)
> > .leftJoin(groupedLabel, ...)
> > .groupBy(...)
> > .aggregate(...);
> >
> > Are there more efficient / less naive / more elegant / simpler solutions
> to
> > this? Co-grouping input streams did not yield expected results...
> >
> > Best wishes,
> >
> > Karsten
>

Reply via email to