Hi, Péter Thanks! I summarized our discussion so far and also adds some thoughts in https://docs.google.com/document/d/1srfQWyomX5HanwJkAr5qEAe7lLPzE9b-qv46L9vIXA4/edit?tab=t.0#heading=h.313stzg3g3hr. Could you help review it when you have a chance? Comments and suggestions are very welcome.
Best Lu On Wed, Jan 28, 2026 at 3:05 AM Péter Váry <[email protected]> wrote: > > Current Flink committer does not call validateDataFilesExist(), which > means if the commit order is compaction succeeds first then Flink commit > can succeed with dangling position deletes. > > The current Flink committer does not generate position delete files for > existing data files, so no check is required today. > > If we follow a *“commit → compact equality deletes to position deletes”* > flow, the check is still not needed during commit, but it becomes necessary > during compaction. > > If we instead choose a *“compact equality deletes to position deletes → > commit”* flow, then the check must be performed as part of the commit. > > With exactly-once sources and sinks, this can largely be mitigated through > Flink job restarts. If that is not sufficient, we could also introduce > retry logic into the equality-to-position delete compaction. > > That said, I believe the larger issue is that external compaction will not > be able to commit changes unless the Flink job is stopped, or the scope of > concurrent updates is constrained such that they do not interfere with the > compaction process. > > Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 28., Sze, 1:22): > >> Hi, Péter >> >> > This scenario could be addressed with retries, since compactions are >> expected to be relatively infrequent. >> >> Sorry to reiterate on this point. Essentially the question is: >> >> Assume flink job will maintain PK → (filename, position) then we can have >> 2 concurrent writers: >> >> 1. a flink job writes a position delete pointing to an old data fie F1 >> >> 2. a spark compaction job which deletes F1 >> >> Current Flink committer does not call validateDataFilesExist(), which >> means if the commit order is compaction succeeds first then Flink commit >> can succeed with dangling position deletes. >> >> >> Is my understanding correct? Or is the problem supposed to be solved by >> the TableMaintenance PR you mentioned? >> >> Best >> >> Lu >> >> >> >> >> On Tue, Jan 27, 2026 at 2:42 AM Péter Váry <[email protected]> >> wrote: >> >>> > It seems we can still allow external compaction to some extent? >>> >>> The challenge is that streaming writes are significantly more frequent >>> than regular commits. If a streaming write produces a delete file, it is >>> very likely to conflict with any ongoing compaction. >>> >>> Consider a Flink job with a 2‑minute checkpoint interval, where every >>> commit contains some deletes. A compaction typically takes longer than 2 >>> minutes, so during its execution at least one Flink commit will update the >>> table. If that commit includes a delete for a file currently being >>> compacted, the compaction will be reverted. In the worst case, this could >>> lead to a situation where compaction never successfully completes. >>> >>> > My biggest concern is external compaction can succeed and the map in >>> flink job silently becomes stale, leading to data correctness issues. >>> What's your take on this? Given what you mentioned, it seems this is not a >>> concern because one of both must fail? >>> >>> This scenario could be addressed with retries, since compactions are >>> expected to be relatively infrequent. >>> >>> >>> >>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 27., K, 2:00): >>> >>>> Hi, Péter >>>> >>>> > A concurrently running external compaction could cause an >>>> equality-delete compaction to fail and retry, or an equality-delete >>>> compaction could cause an external compaction to fail if it attempts to >>>> compact files affected by newly introduced deletes. >>>> >>>> It seems we can still allow external compaction to some extent? >>>> Following this approach, the flink job will maintain PK → (filename, >>>> position). My biggest concern is external compaction can succeed and the >>>> map in flink job silently becomes stale, leading to data correctness >>>> issues. What's your take on this? Given what you mentioned, it seems this >>>> is not a concern because one of both must fail? >>>> >>>> Best >>>> Lu >>>> >>>> On Sat, Jan 24, 2026 at 7:50 AM Péter Váry <[email protected]> >>>> wrote: >>>> >>>>> Hi Lu, >>>>> >>>>> > Regarding the TableMaintenance you mentioned—does this effectively >>>>> run compaction as part of the Flink job? If so, does that mean the >>>>> execution flow becomes serialized like: >>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit >>>>> new snapshot) -> compaction -> ...? >>>>> >>>>> Yes. This can be partially mitigated by avoiding running equality >>>>> delete compaction on every commit. Note that this is not a full >>>>> compaction, >>>>> but only an equality-delete rewrite. >>>>> Longer term, we could implement the approach proposed by Steven, where >>>>> equality deletes are not immediately committed to the table but are >>>>> instead >>>>> kept in Flink state, and only committed once the equality-delete rewrite >>>>> has completed. >>>>> >>>>> > Under this model, is compaction within the Flink job sufficient on >>>>> its own? Today, external compaction helps rewrite historical files to be >>>>> more reader-optimized while the Flink job continues to commit the newest >>>>> data. I’m wondering whether we can achieve the same effect with compaction >>>>> running only inside the Flink job. >>>>> >>>>> Currently, Flink only supports BinPack compaction. The preferred >>>>> solution would be to implement shuffle compaction directly in Flink. >>>>> >>>>> As an alternative, we could tolerate occasional compaction failures. A >>>>> concurrently running external compaction could cause an equality-delete >>>>> compaction to fail and retry, or an equality-delete compaction could cause >>>>> an external compaction to fail if it attempts to compact files affected by >>>>> newly introduced deletes. If equality-delete compactions run frequently, >>>>> this may lead to starvation on the external compaction side. For this >>>>> reason, integrating compaction into the Flink jobs themselves appears to >>>>> be >>>>> the more robust overall solution. >>>>> >>>>> > Also, what's the main challenge to simplify the index design? I’m >>>>> not an expert here, but similar open table formats like Paimon or Hudi >>>>> both >>>>> support primary key indexes. Is it possible to borrow some ideas or >>>>> patterns from those systems? >>>>> >>>>> The challenge is not with the index design itself, but with the >>>>> underlying table layout. To the best of my knowledge, both Paimon and Hudi >>>>> impose layout constraints by bucketing rows based on the primary key. As a >>>>> result, if a reader‑optimized layout is required and queries need to >>>>> efficiently access data by columns other than the primary key, Iceberg >>>>> tables are generally a better fit, as they do not impose such layout >>>>> restrictions. >>>>> >>>>> I hope this helps, >>>>> Thanks, >>>>> Peter >>>>> >>>>> >>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 23., P, >>>>> 18:09): >>>>> >>>>>> Hi, Péter >>>>>> >>>>>> Thanks for the reply! >>>>>> >>>>>> 1. >>>>>> > TableMaintenance >>>>>> Regarding the TableMaintenance you mentioned—does this effectively >>>>>> run compaction as part of the Flink job? If so, does that mean the >>>>>> execution flow becomes serialized like: >>>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit >>>>>> new snapshot) -> compaction -> ...? >>>>>> >>>>>> 2. >>>>>> > Our conclusion was that, as a general rule, users should avoid >>>>>> running compaction outside of the Flink job >>>>>> Under this model, is compaction within the Flink job sufficient on >>>>>> its own? Today, external compaction helps rewrite historical files to be >>>>>> more reader-optimized while the Flink job continues to commit the newest >>>>>> data. I’m wondering whether we can achieve the same effect with >>>>>> compaction >>>>>> running only inside the Flink job. >>>>>> >>>>>> 3. >>>>>> Also, what's the main challenge to simplify the index design? I’m >>>>>> not an expert here, but similar open table formats like Paimon or Hudi >>>>>> both >>>>>> support primary key indexes. Is it possible to borrow some ideas or >>>>>> patterns from those systems? >>>>>> >>>>>> Best >>>>>> Lu >>>>>> >>>>>> >>>>>> On Fri, Jan 23, 2026 at 3:37 AM Péter Váry < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Excellent questions Lu, >>>>>>> >>>>>>> > 1. PK → (filename, position) map in memory => are you referring to >>>>>>> Flink state? If so, are we concerned that its size will grow without >>>>>>> bound >>>>>>> as the primary key cardinality increases? >>>>>>> >>>>>>> We could store this mapping in Flink state, but restoring it may >>>>>>> introduce more complexity than recomputing it by re‑reading `SELECT PK, >>>>>>> filename, position` on each restart, especially since the table may >>>>>>> change >>>>>>> while the job is stopped. My initial thought would be to persist the >>>>>>> data >>>>>>> in RocksDB during execution but recompute it on restart rather than >>>>>>> restoring state. >>>>>>> >>>>>>> > 2. How do we handle the fact that this map can become stale when >>>>>>> external compaction occurs? >>>>>>> >>>>>>> We discussed this with Steven when working on the proposal. Our >>>>>>> conclusion was that, as a general rule, users should avoid running >>>>>>> compaction outside of the Flink job. In practice, we could also >>>>>>> introduce >>>>>>> validators that prevent committing equality‑delete to position‑delete >>>>>>> conversions when a concurrent compaction is detected. >>>>>>> >>>>>>> >>>>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 22., Cs, >>>>>>> 19:27): >>>>>>> >>>>>>>> Hi, Péter >>>>>>>> >>>>>>>> Thanks for the reply! I have some high level questions about the >>>>>>>> idea: >>>>>>>> >>>>>>>> 1. PK → (filename, position) map in memory => are you referring to >>>>>>>> Flink state? If so, are we concerned that its size will grow without >>>>>>>> bound >>>>>>>> as the primary key cardinality increases? >>>>>>>> 2. How do we handle the fact that this map can become stale when >>>>>>>> external compaction occurs? >>>>>>>> >>>>>>>> Best >>>>>>>> Lu >>>>>>>> >>>>>>>> On Thu, Jan 22, 2026 at 12:03 AM Péter Váry < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Lu, >>>>>>>>> >>>>>>>>> Steven and I have other priorities at the moment, so please feel >>>>>>>>> free to pick up any loose threads here. >>>>>>>>> >>>>>>>>> We gained quite a lot by relaxing some of the requirements from >>>>>>>>> the original proposal. If we accept that equality deletes remain, but >>>>>>>>> compacted very soon, we could get at an even more limited change that >>>>>>>>> still >>>>>>>>> helps your use case and points in the right overall direction. >>>>>>>>> >>>>>>>>> My initial proposal would be: >>>>>>>>> >>>>>>>>> - First, ensure that Guo’s PR enabling Flink TableMaintenance >>>>>>>>> without a LockManager gets merged: >>>>>>>>> https://github.com/apache/iceberg/pull/15042 >>>>>>>>> - Introduce a TableMaintenance task that handles newly written >>>>>>>>> equality deletes: >>>>>>>>> - Maintain a PK → (filename, position) map in memory, >>>>>>>>> distributed across the TaskManagers. >>>>>>>>> - In the absence of indexes for now, this map can be >>>>>>>>> reconstructed on every job start. >>>>>>>>> - Update the map whenever new table changes are committed. >>>>>>>>> - Convert PK-based equality deletes into position deletes. >>>>>>>>> - Place this task at the end of the streaming ingest pipeline. >>>>>>>>> >>>>>>>>> This approach builds entirely on existing components and can later >>>>>>>>> be enhanced with proper index support. If we decide to refactor the >>>>>>>>> ingest >>>>>>>>> stream, we could even avoid committing equality deletes to the table >>>>>>>>> altogether and remove the need for them in the Flink jobs. >>>>>>>>> >>>>>>>>> I don’t have the bandwidth to push this forward myself, but I’d be >>>>>>>>> very happy to help with proposal and code reviews. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Peter >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 21, 2026, 18:48 Lu Niu <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Thank you for all your replies! >>>>>>>>>> >>>>>>>>>> > Some people use the current Flink Iceberg sink for CDC >>>>>>>>>> ingestion. But it would produce equality deletes that would require >>>>>>>>>> aggressive compactions and add operational burden too >>>>>>>>>> >>>>>>>>>> Since the main concern is reader-side performance degradation due >>>>>>>>>> to the accumulation of equality deletes over time. Is there a way to >>>>>>>>>> estimate impact on the reader side based on equality deletes in a >>>>>>>>>> snapshot >>>>>>>>>> summary? >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> while new_snapshot_summary_is_ready: >>>>>>>>>> should_compact = analyze_snapshot_summary(snapshot_summary) >>>>>>>>>> if should_compact: >>>>>>>>>> rewrite_data_files() >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> > The original design has high complexity. We were thinking about >>>>>>>>>> alternatives with narrower scope. But there isn't any progress and >>>>>>>>>> timeline >>>>>>>>>> . >>>>>>>>>> >>>>>>>>>> If this is the community aligned long term, Is there any way I >>>>>>>>>> could contribute to speed this up? Thanks! >>>>>>>>>> >>>>>>>>>> Best >>>>>>>>>> Lu >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jan 21, 2026 at 2:18 AM Maximilian Michels < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Lu, >>>>>>>>>>> >>>>>>>>>>> Just to iterate the status quo: Flink supports upserts, but only >>>>>>>>>>> via >>>>>>>>>>> equality delete + append. So technically, "streaming writes" >>>>>>>>>>> aren't an >>>>>>>>>>> issue. It's the read path which causes the issue, because unlike >>>>>>>>>>> positional deletes, which can be resolved on the fly during >>>>>>>>>>> streaming >>>>>>>>>>> reads, equality deletes potentially require a full table scan to >>>>>>>>>>> be >>>>>>>>>>> materialized. Constant snapshot compaction is required to keep >>>>>>>>>>> the >>>>>>>>>>> read path efficient. >>>>>>>>>>> >>>>>>>>>>> >1. A Flink job that continuously appends CDC events into an >>>>>>>>>>> append-only raw table >>>>>>>>>>> >2. A periodically scheduled Spark job that performs upsert the >>>>>>>>>>> `current` table using `raw` table >>>>>>>>>>> >>>>>>>>>>> This makes sense. Conceptually, you are pre-compacting upserts >>>>>>>>>>> before >>>>>>>>>>> writing into the final "current" table. This avoids equality >>>>>>>>>>> deletes >>>>>>>>>>> entirely and keeps the read path on the "current" table >>>>>>>>>>> efficient at >>>>>>>>>>> all times. The drawback is that your lower bound latency will be >>>>>>>>>>> the >>>>>>>>>>> interval at which the Spark job runs, but this is an acceptable >>>>>>>>>>> price >>>>>>>>>>> to pay, until we have a way to write positional deletes right >>>>>>>>>>> away, >>>>>>>>>>> avoiding equality deletes entirely. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Max >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 21, 2026 at 8:48 AM melin li < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Flink CDC support reading binlog data from databases such as >>>>>>>>>>> MySQL and PostgreSQL, and writing it to Iceberg, Hudi, and Paimon. >>>>>>>>>>> > https://github.com/apache/flink-cdc/pulls?q=iceberg >>>>>>>>>>> > >>>>>>>>>>> > Steven Wu <[email protected]> 于2026年1月21日周三 15:27写道: >>>>>>>>>>> >> >>>>>>>>>>> >> Lu, >>>>>>>>>>> >> >>>>>>>>>>> >> you are correct about the design doc for Flink writing >>>>>>>>>>> position deletes only. The original design has high complexity. We >>>>>>>>>>> were >>>>>>>>>>> thinking about alternatives with narrower scope. But there isn't any >>>>>>>>>>> progress and timeline . >>>>>>>>>>> >> >>>>>>>>>>> >> IMHO, your setup is a good practice today. Ryan wrote a >>>>>>>>>>> series of blogs for the pattern: >>>>>>>>>>> https://tabular.medium.com/hello-world-of-cdc-e6f06ddbfcc0. >>>>>>>>>>> >> >>>>>>>>>>> >> Some people use the current Flink Iceberg sink for CDC >>>>>>>>>>> ingestion. But it would produce equality deletes that would require >>>>>>>>>>> aggressive compactions and add operational burden too. Also not all >>>>>>>>>>> engines >>>>>>>>>>> can read equality deletes. >>>>>>>>>>> >> >>>>>>>>>>> >> Thanks, >>>>>>>>>>> >> Steven >>>>>>>>>>> >> >>>>>>>>>>> >> On Tue, Jan 20, 2026 at 8:44 PM Gang Wu <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> Hi Lu, >>>>>>>>>>> >>> >>>>>>>>>>> >>> Nice to hear from you here in the Iceberg community :) >>>>>>>>>>> >>> >>>>>>>>>>> >>> We have built an internal service to stream upserts into >>>>>>>>>>> position deletes which happens to have a lot in common with [1] and >>>>>>>>>>> [2]. I >>>>>>>>>>> believe this is a viable approach to achieve second freshness. >>>>>>>>>>> >>> >>>>>>>>>>> >>> [1] >>>>>>>>>>> https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk >>>>>>>>>>> >>> [2] https://www.mooncake.dev/whitepaper >>>>>>>>>>> >>> >>>>>>>>>>> >>> Best, >>>>>>>>>>> >>> Gang >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Wed, Jan 21, 2026 at 11:05 AM Lu Niu <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Hi Iceberg community, >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> What are the current best practices for streaming upserts >>>>>>>>>>> into an Iceberg table? >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Today, we have the following setup in production to support >>>>>>>>>>> CDC: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> 1. A Flink job that continuously appends CDC events into an >>>>>>>>>>> append-only raw table >>>>>>>>>>> >>>> 2, A periodically scheduled Spark job that performs upsert >>>>>>>>>>> the `current` table using `raw` table >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> We are exploring whether it’s feasible to stream upserts >>>>>>>>>>> directly into an Iceberg table from Flink. This could simplify our >>>>>>>>>>> architecture and potentially further reduce our data SLA. We’ve >>>>>>>>>>> experimented with this approach before, but ran into reader-side >>>>>>>>>>> performance issues due to the accumulation of equality deletes over >>>>>>>>>>> time. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> From what I can gather, streaming upserts still seems to be >>>>>>>>>>> an open design area: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> 1. (Please correct me if I’m wrong—this summary is partly >>>>>>>>>>> based on ChatGPT 5.1.) The book “Apache Iceberg: The Definitive >>>>>>>>>>> Guide” >>>>>>>>>>> suggests the two-table pattern we’re currently using in production. >>>>>>>>>>> >>>> 2. These threads: >>>>>>>>>>> https://lists.apache.org/thread/gjjr30txq318qp6pff3x5fx1jmdnr6fv >>>>>>>>>>> , >>>>>>>>>>> https://lists.apache.org/thread/xdkzllzt4p3tvcd3ft4t7jsvyvztr41j >>>>>>>>>>> discuss the idea of outputting only positional deletes (no equality >>>>>>>>>>> deletes) by introducing an index. However, this appears to still be >>>>>>>>>>> under >>>>>>>>>>> discussion and may be targeted for v4, with no concrete timeline >>>>>>>>>>> yet. >>>>>>>>>>> >>>> 3. this thread >>>>>>>>>>> https://lists.apache.org/thread/6fhpjszsfxd8p0vfzc3k5vw7zmcyv2mq >>>>>>>>>>> talks about deprecating equality deletes, but I haven’t seen a >>>>>>>>>>> clearly >>>>>>>>>>> defined alternative come out of that discussion yet. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Given all of the above, I’d really appreciate guidance from >>>>>>>>>>> the community on: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> 1. Recommended patterns for streaming upserts with Flink >>>>>>>>>>> into Iceberg today (it's good to know the long term possible as >>>>>>>>>>> well, but >>>>>>>>>>> my focus is what's possible in near term). >>>>>>>>>>> >>>> 2. Practical experiences or lessons learned from teams >>>>>>>>>>> running streaming upserts in production >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Thanks in advance for any insights and corrections. >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Best >>>>>>>>>>> >>>> Lu >>>>>>>>>>> >>>>>>>>>>
