Hi, I am not sure about your overall setup. Are your stores local (similar to RocksDB) or are you using one global remote store? If you use a global remote store, you would not need to back your changes in a changelog topic, as the store would not be lost if case of failure.
Also (in case that your stores are remote), did you consider using Kafka Connect to export your data into an external store like MySQL or MongoDB instead of writing your own custom stores for Streams? If your stores are local, why do you write custom stores? I am curious to understand why RocksDB does not serve your needs. About your two comment: (1) Streams uses RocksDB by default and the default implementation is using "checkpoint files" in next release. Those checkpoint files track the changelog offsets of the data that got flushed to disc. This allows to reduce the startup time, as only the tail of the changelog needs to be read to bring the store up to date. For this, you would always (1) write to the changelog, (2) write to you store. Each time you need to flush, you know that all data is in the changelog already. After each flush, you can update the "local offset checkpoint file". I guess, if you use local stores you can apply a similar pattern in you custom store implementation. (And as mentioned above, for global remote store you would not need the changelog anyway. -- This also applies to your recovery question from below.) (2) You can configure standby task (via StreamConfig "num.standby.replicas"). This will set up standby tasks that passively replicate your stores to another instance. In error case, state will be migrated to those "hot standbys" reducing recovery time significantly. About your question: (1) Yes. (2) Partly parallel (ie, if you run with multiple threads -- cf. StreamsConfig "num.streams.thread"). Each thread, flushes all it's stores sequentially. (3) Yes. There will be a store for each partition. (If store is local.) (4) Yes. The overall processing loop is sequential (cf. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture) Also, the next commit point is computed after a successful commit -- thus, if one commit is delayed, all consecutive commit points are "shifted" by this delay. -Matthias On 5/12/17 9:00 AM, João Peixoto wrote: > On a stream definition I perform an "aggregate" which is configured with a > state store. > > *Goal*: Persist the aggregation results into a database, e.g. MySQL or > MongoDB > > *Working approach*: > I have created a custom StateStore backed by a changelog topic like the > builtin state stores. Whenever the store gets flushed I save to the > database, mark the record as being persisted and log the change in the > changelog. > > If something goes wrong during processing, the changelog guarantees that I > do not lose data, restores the state and if some data point was not > persisted, the next stream instance will persist it on its flush operation. > > 1. I cannot store too much data in the changelog, even with compaction, if > I have too much data, bootstrapping a stream instance would take a long time > 2. On the other hand, if I take too long to recover from a failure, I may > lose data. So there seems to be a delicate tradeoff here > > *Questions*: > > 1. Is this a reasonable use case? > 2. In a scenario where my stream would have a fanout (multiple sub-streams > based on the same stream), each branch would perform different "aggregate" > operations, each with its own state store. Are state stores flushed in > parallel or sequentially? > 3. The above also applies per-partition. As a stream definition is > parallelized by partition, will one instance hold different store instances > for each one? > 4. Through synthetic sleeps I simulated slow flushes, slower than the > commit interval. The stream seems to be ok with it and didn't throw, I > assume the Kafka consumer does not poll more records until all of the > previous poll's are committed, but I couldn't find documentation to back > this statement. Is there a timeout for "commit" operations? > > > Sample code > > public class AggregateHolder { > > private Long commonKey; > private List<Double> rawValues = new ArrayList<>(); > private boolean persisted; > > // ... > } > > And stream definition > > source.groupByKey(Serdes.String(), recordSerdes) > .aggregate( > AggregateHolder::new, > (aggKey, value, aggregate) -> > aggregate.addValue(value.getValue()), > new DemoStoreSupplier<>(/* ... */) > ) > .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})", > key, agg.getAverage(), agg)); >
signature.asc
Description: OpenPGP digital signature