Thanks for the comments, here are some clarifications: I did look at interactive queries, if I understood them correctly it means that my state store must hold all the results in order for it to be queried, either in memory or through disk (RocksDB). 1. The retention policy on my aggregate operations, in my case, is 90 days, which is way too much data to hold in memory 2. My stream instances do no have access to disk, even if they did, wouldn't it mean I'd need almost twice the disk space to hold the same data? I.e. kafka brokers golding the topics + RocksDB holding the state? 3. Because a crash may happen between an entry is added to the changelog and the data store is flushed, I need to get all the changes everytime if I want to guarantee that all data is eventually persisted. This is why checkpoint files may not work for me.
Standby tasks looks great, I forgot about those. I'm at the design phase so this is all tentative. Answering Matthias questions My state stores are local. As mentioned above I do not have access to disk therefore I need to recover all data from somewhere, in this case I'm thinking about the changelog. I read about Kafka Connect but have never used it, maybe that'll simplify things, but I need to do some studying there. The reason why even though my stores are local but still I want to store them on a database and not use straight up RocksDB (or global stores) is because this would allow me to migrate my current processing pipeline to Kafka Streams without needing to change the frontend part of the application, which fetches data from MongoDB. Kafka Streams would update the very same collections, making the change transparent to the consumer (at this stage). Additionally, from my experience MongoDB does not like it much when you perform heavy write operations, so having a sink step that is a "write" operation to MongoDB, and considering the sequential nature of kafka streams, would translate to at least 1 write to MongoDB per record, if my stream processes 100k records/second for one record type and there are 10 different record types (each with its own kafka streams instance) I'd be issuing 1M write queries/second to MongoDB, which I can foresee being a problem. The state stores approach would allow me to *batch writes*. I think I addressed all questions? Looking forward to hearing more feedback PS When you mention Global State Stores I'm thinking of http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application, is this correct? On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I am not sure about your overall setup. Are your stores local (similar > to RocksDB) or are you using one global remote store? If you use a > global remote store, you would not need to back your changes in a > changelog topic, as the store would not be lost if case of failure. > > Also (in case that your stores are remote), did you consider using Kafka > Connect to export your data into an external store like MySQL or MongoDB > instead of writing your own custom stores for Streams? > > If your stores are local, why do you write custom stores? I am curious > to understand why RocksDB does not serve your needs. > > > About your two comment: > > (1) Streams uses RocksDB by default and the default implementation is > using "checkpoint files" in next release. Those checkpoint files track > the changelog offsets of the data that got flushed to disc. This allows > to reduce the startup time, as only the tail of the changelog needs to > be read to bring the store up to date. For this, you would always (1) > write to the changelog, (2) write to you store. Each time you need to > flush, you know that all data is in the changelog already. After each > flush, you can update the "local offset checkpoint file". > > I guess, if you use local stores you can apply a similar pattern in you > custom store implementation. (And as mentioned above, for global remote > store you would not need the changelog anyway. -- This also applies to > your recovery question from below.) > > (2) You can configure standby task (via StreamConfig > "num.standby.replicas"). This will set up standby tasks that passively > replicate your stores to another instance. In error case, state will be > migrated to those "hot standbys" reducing recovery time significantly. > > > About your question: > > (1) Yes. > (2) Partly parallel (ie, if you run with multiple threads -- cf. > StreamsConfig "num.streams.thread"). Each thread, flushes all it's > stores sequentially. > (3) Yes. There will be a store for each partition. (If store is local.) > (4) Yes. The overall processing loop is sequential (cf. > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture > ) > Also, the next commit point is computed after a successful commit -- > thus, if one commit is delayed, all consecutive commit points are > "shifted" by this delay. > > > -Matthias > > > On 5/12/17 9:00 AM, João Peixoto wrote: > > On a stream definition I perform an "aggregate" which is configured with > a > > state store. > > > > *Goal*: Persist the aggregation results into a database, e.g. MySQL or > > MongoDB > > > > *Working approach*: > > I have created a custom StateStore backed by a changelog topic like the > > builtin state stores. Whenever the store gets flushed I save to the > > database, mark the record as being persisted and log the change in the > > changelog. > > > > If something goes wrong during processing, the changelog guarantees that > I > > do not lose data, restores the state and if some data point was not > > persisted, the next stream instance will persist it on its flush > operation. > > > > 1. I cannot store too much data in the changelog, even with compaction, > if > > I have too much data, bootstrapping a stream instance would take a long > time > > 2. On the other hand, if I take too long to recover from a failure, I may > > lose data. So there seems to be a delicate tradeoff here > > > > *Questions*: > > > > 1. Is this a reasonable use case? > > 2. In a scenario where my stream would have a fanout (multiple > sub-streams > > based on the same stream), each branch would perform different > "aggregate" > > operations, each with its own state store. Are state stores flushed in > > parallel or sequentially? > > 3. The above also applies per-partition. As a stream definition is > > parallelized by partition, will one instance hold different store > instances > > for each one? > > 4. Through synthetic sleeps I simulated slow flushes, slower than the > > commit interval. The stream seems to be ok with it and didn't throw, I > > assume the Kafka consumer does not poll more records until all of the > > previous poll's are committed, but I couldn't find documentation to back > > this statement. Is there a timeout for "commit" operations? > > > > > > Sample code > > > > public class AggregateHolder { > > > > private Long commonKey; > > private List<Double> rawValues = new ArrayList<>(); > > private boolean persisted; > > > > // ... > > } > > > > And stream definition > > > > source.groupByKey(Serdes.String(), recordSerdes) > > .aggregate( > > AggregateHolder::new, > > (aggKey, value, aggregate) -> > > aggregate.addValue(value.getValue()), > > new DemoStoreSupplier<>(/* ... */) > > ) > > .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})", > > key, agg.getAverage(), agg)); > > > >