Hi, In generic terms, if a keyed operator outputs its state into a sink, and the state of that operator after a restart can be derived from the sink's contents, can we do just that, as opposed to using checkpointing mechanics? Would that reduce latency (even if only theoretically)?
An example: A word counting app with a Kafka source and a Kafka sink (compacted topic). The input is an unbounded stream of single words and the output is a <word, word_count> tuple stream that goes into a Kafka compacted topic. AFAIK Flink can guarantee exactly-once semantics by updating the keyed state of its counting operator on _every_ event - after a restart it can then resume from the last checkpoint. But in this case, given the sink contains exactly the same relevant information as a checkpoint (namely the <key, key_count> tuples), could we load the state of an operator from our sink and avoid the latency added by our state backend? If so, how can this be achieved? If we replaced the Kafka sink with a database sink, could we on startup know which keys a Flink task has been allocated, perform a _single_ query to the database to load the key_counts and load those into the operator? How can this be achieved? Instead of a single query you may want to do a batched query, as long as you're not querying the database once per key. Thanks, Eduardo