Hi Jason, Your design sounds right to me (I presume you are using PAPI not the Streams DSL), that you are reading in one stream, calling transform() on the stream in which an state store is associated with the operator. Since your update logic seems to be only key-dependent and hence can be embarrassingly parallelizable via Streams library based on key partitioning.
One thing to note that, it seems you are calling "context.commit()" after every single put call, which is not a recommended pattern. This is because `commit()` operation is a pretty heavy call that will involves flushing the state store, sending commit offset request to the Kafka broker etc, calling it on every single call would result in very low throughput. It is recommended to only call commit() only after a bunch of records are processed, or you can just rely on the Streams config "commit.interval.ms" to rely on Streams library to only call commit() internally after every time interval. Note that this will not affect your processing semantics upon graceful shutting down, since upon shutdown Streams will always enforce a commit() call. Guozhang On Mon, Mar 26, 2018 at 6:34 AM, Jason Turim <ja...@signalvine.com> wrote: > We've started experimenting with Kafka to see if it can be used to > aggregate our application data. I think our use case is a match for Kafka > streams, but we aren't sure if we are using the tool correctly. The proof > of concept we've built seems to be working as designed, I'm not sure that > we are using the APIs appropriately. > > Our proof of concept is to use kafka streams to keep a running tally of > information about a program, and writes that data to an output topic, e.g. > > { > "numberActive": 0, > "numberInactive": 0, > "lastLogin": "01-01-1970T00:00:00Z" > } > > Computing the tally is easy, it is essentially executing a compare and swap > (CAS) operation based on the input topic & output field. > > The local state store is used to store of the most recent program for a > given key. We join an input stream against the state store and run the CAS > operation using a TransformSupplier, which explictly writes the data to the > state store using > > context.put(...) > context.commit(); > > Is this an appropriate use of the local state store? Is there another > another approach to keeping a stateful running tally in a topic? > > thanks, > Jason > -- -- Guozhang