Hi there,

A couple of general comments, plus some answers:

- general comment: have you thought of using Interactive Queries to directly 
query the aggregate data, without needing to store them to an external database 
(see this blog: 
https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
 
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>).
 That has the potential to simplify the overall architecture.



> 
> 1. I cannot store too much data in the changelog, even with compaction, if
> I have too much data, bootstrapping a stream instance would take a long time

> 2. On the other hand, if I take too long to recover from a failure, I may
> lose data. So there seems to be a delicate tradeoff here

There is the option of using standby tasks to reduce the recovery time 
(http://docs.confluent.io/current/streams/architecture.html#fault-tolerance 
<http://docs.confluent.io/current/streams/architecture.html#fault-tolerance>)

> 
> 2. In a scenario where my stream would have a fanout (multiple sub-streams
> based on the same stream), each branch would perform different "aggregate"
> operations, each with its own state store. Are state stores flushed in
> parallel or sequentially?

Depends on how many threads you have. If you have N threads, each works and 
flushes in parallel.

> 3. The above also applies per-partition. As a stream definition is
> parallelized by partition, will one instance hold different store instances
> for each one?

Yes.

> 4. Through synthetic sleeps I simulated slow flushes, slower than the
> commit interval. The stream seems to be ok with it and didn't throw, I
> assume the Kafka consumer does not poll more records until all of the
> previous poll's are committed, but I couldn't find documentation to back
> this statement. Is there a timeout for "commit" operations?
> 
> 

Yes, a single thread polls() then commits(). The timeout for commit operations 
is controlled through the
REQUEST_TIMEOUT_MS_CONFIG option on the streams producer (by default 30 
seconds, you shouldn't have to change it normally).

Thanks
Eno


> Sample code
> 
> public class AggregateHolder {
> 
>    private Long commonKey;
>    private List<Double> rawValues = new ArrayList<>();
>    private boolean persisted;
> 
> // ...
> }
> 
> And stream definition
> 
> source.groupByKey(Serdes.String(), recordSerdes)
>              .aggregate(
>                      AggregateHolder::new,
>                      (aggKey, value, aggregate) ->
> aggregate.addValue(value.getValue()),
>                      new DemoStoreSupplier<>(/* ... */)
>              )
>              .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})",
> key, agg.getAverage(), agg));

Reply via email to