Hi all! Thank you for the writeup of this feature. I like the general direction a lot.
There are some open questions and confusing details still, which I think we need to clarify first to make this feature really good. Below are questions/suggestions on the FLIP: Best, Stephan =============== *(1) Log Implementation* I agree with Eron that we should not design this hardwired to Kafka. Let's have the internal interfaces in place to make this open to other streaming storage systems as well. The config options seem to be designed in a way that is Kafka-exclusive. Can we change this, for example to something like - storage.log.system=kafka - storage.log.kafka.properties.bootstrap.servers - storage.log.kafka.retention *(2) Change Tracking* I am not sure I understand this fully. When a batch query inserts without change tracking what happens then? - does it skip writing to the change log? - does it simply overwrite the managed table with the new result? - something different? *(3) "table-storage.log.scan.startup.mode"* Somehow the presence of this flag seems to break the abstraction of managed tables. Let's say someone creates a managed table that is computed via a query over another managed table. It would need all the data from the previous table, or it would be inconsistent. What is the reason to have this setting? Support cases where one doesn't need all past data (let's say only data from the previous month)? Exposing this again somewhat destroys the nice "transparent out of the box" behavior, because now users need to think again about the incremental building of the tables. I think that case shows that we miss a bit better handling of data retention (see next point). Also, this seems to be a per-query setting, more than a global setting, so should this be part of the config with which the query is submitted that reads from the table-storage? The names could also be improved a bit, I think, for example we could call it just "table-storage.log.scan" with values "full", "latest", "from-timestamp". *(4) Data retention* I am wondering how and when data is ever cleaned up. For example, when the table definition has a time attribute and predicate so that the managed table should only contain the data from the previous month. How does old data get cleaned up? Only through deletes coming from timers in the Flink SQL layer? I think if we want this to be really good and efficient, we need to look at dropping data during the compaction. The compaction should know it needs to retain only data from WaterMark - 1 month or so. That is somewhat similar to the optimization I proposed also for SQL in general, to get rid of timers and only use TTL (and compaction filters) for data expiration. I think for managed tables, this is even more crucial for performance. But it would mean that we need to have a better model for inferring required data retention based on predicates over the time columns, and not simply just have fixed retention based on the watermark. *(5) Different formats for cases with PK and without PK* The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for cases with PK. Do we expect that some users directly subscribe to the Table Changelog, meaning directly read via a Kafka Consumer from the topic? - I would expect that this will happen, because users want to avoid writing the log twice (one for Flink managed table queries, one for external subscribers). - If this is publicly exposed, then the fact that it uses different formats in different cases (PK or no PK) seems really confusing and not intuitive for users. - Can the format be just Debezium-JSON in all cases? *(6) Different consistency guarantees with PK and without PK* Is this purely an internal implementation detail, or will users see a difference? My understanding is that users see a difference. Having that difference implicitly happen when users add a PK reference seems very confusing to me. What about cases where the table has a PK (because users want the data in Kafka that way) but want transactional consistency? If we need the "low-latency eventual consistency" mode with PKs, I would suggest making this a separate mode that users can choose to activate if they want. We can restrict it to cases that have a PK, but not automatically change the behavior when a PK is declared. *(7) Eventual Consistency Mode vs. Faster Checkpoints* The eventual consistency mode with PK seems mainly there to get lower latency for the changelog. What latencies are we looking for here? There is also the work on generalized incremental checkpoints, which should get the latency down to a few seconds, would that be good enough? The current Upsert Kafka Source (which would be used with PK eventual consistency mode) has a big inefficiency in the way it needs to retain all state to convert the records to changelog records. That is also a high price to pay for that mode. *(8) Concurrent Write / Locking* I don't fully understand what the plan is. - What is the locking philosophy in general, what operations are mutually exclusive (probably multiple insert statements), what can happen at the same time (only query and compaction)? - what exactly is the optimistic locking behavior here, when can something that failed succeed on the next retry? Can a compaction fail while there is an insert statement? How can it then succeed when retried? Or can the compaction only interfere with the insert statement in special situations? - what is the lock scoped to? a single query (via jobID)? - what happens if a system that holds (like an entry in the Hive Metastore Database) a lock crashes and is manually restarted as a new job (from the latest checkpoint)? does that need manual admin intervention to release the lock (manually update the HMS Database)? *(9) Miscellaneous:* Language/typos: - "A single bucket can only be written by a single parallelism" => "A single bucket can only be written by a single parallel task" Maybe call the "GenericCatalog" instead "FlinkInternalCatalog" or just "FlinkCatalog"?