Hi,

In generic terms, if a keyed operator outputs its state into a sink, and
the state of that operator after a restart can be derived from the sink's
contents, can we do just that, as opposed to using checkpointing mechanics?
Would that reduce latency (even if only theoretically)?

An example: A word counting app with a Kafka source and a Kafka sink
(compacted topic). The input is an unbounded stream of single words and the
output is  a <word, word_count> tuple stream that goes into a Kafka
compacted topic.

AFAIK Flink can guarantee exactly-once semantics by updating the keyed
state of its counting operator on _every_ event - after a restart it can
then resume from the last checkpoint. But in this case, given the sink
contains exactly the same relevant information as a checkpoint (namely the
<key, key_count> tuples), could we load the state of an operator from our
sink and avoid the latency added by our state backend? If so, how can this
be achieved?

If we replaced the Kafka sink with a database sink, could we on startup
know which keys a Flink task has been allocated, perform a _single_ query
to the database to load the key_counts and load those into the operator?
How can this be achieved? Instead of a single query you may want to do a
batched query, as long as you're not querying the database once per key.

Thanks,
Eduardo

Reply via email to