I have a question about state tracking in Structured Streaming.

First let me briefly explain my use case: Given a mutable data source (i.e.
an RDBMS) in which we assume we can retrieve a set of newly created row
versions (being a row that was created or updated between two given
`Offset`s, whatever those are), we can create a Structured Streaming
`Source` which retrieves the new row versions. Further assuming that every
logical row has some primary key, then as long as we can track the current
offset for each primary key, we can differentiate between new and updated
rows. Then, when a row is updated, we can record that the previous version
of that row expired at some particular time. That's essentially what I'm
trying to do. This would effectively give you an "event-sourcing" type of
historical/immutable log of changes out of a mutable data source.

I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
seemed like it would allow me to do exactly the tracking that I needed, so
I decided to try and use that built-in functionality rather than some
external key/value store for storing the current "version number" of each
primary key. There were a lot of hard-coded hoops I had to jump through,
but I eventually made it work by implementing some custom LogicalPlans and
SparkPlans around StateStore[Save/Restore]Exec.

Now, in Spark 2.1.0 it seems to have gotten even further away from what I
was using it for - the keyExpressions of StateStoreSaveExec must include a
timestamp column, which means that those expressions are not really keys
(at least not for a logical row). So it appears I can't use it that way
anymore (I can't blame Spark for this, as I knew what I was getting into
when leveraging developer APIs). There are also several hard-coded checks
which now make it clear that StateStore functionality is only to be used
for streaming aggregates, which is not really what I'm doing.

My question is - is there a good way to accomplish the above use case
within Structured Streaming? Or is this the wrong use case for the state
tracking functionality (which increasingly seems to be targeted toward
aggregates only)? Is there a plan for any kind of generalized
`mapWithState`-type functionality for Structured Streaming, or should I
just give up on that and use an external key/value store for my state
tracking?

Thanks,
Jeremy

Reply via email to