Hi there, A couple of general comments, plus some answers:
- general comment: have you thought of using Interactive Queries to directly query the aggregate data, without needing to store them to an external database (see this blog: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>). That has the potential to simplify the overall architecture. > > 1. I cannot store too much data in the changelog, even with compaction, if > I have too much data, bootstrapping a stream instance would take a long time > 2. On the other hand, if I take too long to recover from a failure, I may > lose data. So there seems to be a delicate tradeoff here There is the option of using standby tasks to reduce the recovery time (http://docs.confluent.io/current/streams/architecture.html#fault-tolerance <http://docs.confluent.io/current/streams/architecture.html#fault-tolerance>) > > 2. In a scenario where my stream would have a fanout (multiple sub-streams > based on the same stream), each branch would perform different "aggregate" > operations, each with its own state store. Are state stores flushed in > parallel or sequentially? Depends on how many threads you have. If you have N threads, each works and flushes in parallel. > 3. The above also applies per-partition. As a stream definition is > parallelized by partition, will one instance hold different store instances > for each one? Yes. > 4. Through synthetic sleeps I simulated slow flushes, slower than the > commit interval. The stream seems to be ok with it and didn't throw, I > assume the Kafka consumer does not poll more records until all of the > previous poll's are committed, but I couldn't find documentation to back > this statement. Is there a timeout for "commit" operations? > > Yes, a single thread polls() then commits(). The timeout for commit operations is controlled through the REQUEST_TIMEOUT_MS_CONFIG option on the streams producer (by default 30 seconds, you shouldn't have to change it normally). Thanks Eno > Sample code > > public class AggregateHolder { > > private Long commonKey; > private List<Double> rawValues = new ArrayList<>(); > private boolean persisted; > > // ... > } > > And stream definition > > source.groupByKey(Serdes.String(), recordSerdes) > .aggregate( > AggregateHolder::new, > (aggKey, value, aggregate) -> > aggregate.addValue(value.getValue()), > new DemoStoreSupplier<>(/* ... */) > ) > .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})", > key, agg.getAverage(), agg));