Hi, Péter Thanks for the comments! 1. Is there anything in the doc that I’ve misunderstood? I want to make sure my understanding is correct. 2. One more question is about writer conflicts. My assumption is that external compaction is common in most industry practices. Is the sequence “shut down the Flink job → wait for external compaction to complete → resume the Flink job” the only way to resolve write conflicts when flink job does compact equality deletes to position deletes?
Best Lu On Wed, Jan 28, 2026 at 5:03 PM Lu Niu <[email protected]> wrote: > Hi, Péter > > Thanks! I summarized our discussion so far and also adds some thoughts in > https://docs.google.com/document/d/1srfQWyomX5HanwJkAr5qEAe7lLPzE9b-qv46L9vIXA4/edit?tab=t.0#heading=h.313stzg3g3hr. > Could you help review it when you have a chance? Comments and suggestions > are very welcome. > > Best > Lu > > On Wed, Jan 28, 2026 at 3:05 AM Péter Váry <[email protected]> > wrote: > >> > Current Flink committer does not call validateDataFilesExist(), which >> means if the commit order is compaction succeeds first then Flink commit >> can succeed with dangling position deletes. >> >> The current Flink committer does not generate position delete files for >> existing data files, so no check is required today. >> >> If we follow a *“commit → compact equality deletes to position deletes”* >> flow, the check is still not needed during commit, but it becomes necessary >> during compaction. >> >> If we instead choose a *“compact equality deletes to position deletes → >> commit”* flow, then the check must be performed as part of the commit. >> >> With exactly-once sources and sinks, this can largely be mitigated >> through Flink job restarts. If that is not sufficient, we could also >> introduce retry logic into the equality-to-position delete compaction. >> >> That said, I believe the larger issue is that external compaction will >> not be able to commit changes unless the Flink job is stopped, or the scope >> of concurrent updates is constrained such that they do not interfere with >> the compaction process. >> >> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 28., Sze, 1:22): >> >>> Hi, Péter >>> >>> > This scenario could be addressed with retries, since compactions are >>> expected to be relatively infrequent. >>> >>> Sorry to reiterate on this point. Essentially the question is: >>> >>> Assume flink job will maintain PK → (filename, position) then we can >>> have 2 concurrent writers: >>> >>> 1. a flink job writes a position delete pointing to an old data fie F1 >>> >>> 2. a spark compaction job which deletes F1 >>> >>> Current Flink committer does not call validateDataFilesExist(), which >>> means if the commit order is compaction succeeds first then Flink commit >>> can succeed with dangling position deletes. >>> >>> >>> Is my understanding correct? Or is the problem supposed to be solved by >>> the TableMaintenance PR you mentioned? >>> >>> Best >>> >>> Lu >>> >>> >>> >>> >>> On Tue, Jan 27, 2026 at 2:42 AM Péter Váry <[email protected]> >>> wrote: >>> >>>> > It seems we can still allow external compaction to some extent? >>>> >>>> The challenge is that streaming writes are significantly more frequent >>>> than regular commits. If a streaming write produces a delete file, it is >>>> very likely to conflict with any ongoing compaction. >>>> >>>> Consider a Flink job with a 2‑minute checkpoint interval, where every >>>> commit contains some deletes. A compaction typically takes longer than 2 >>>> minutes, so during its execution at least one Flink commit will update the >>>> table. If that commit includes a delete for a file currently being >>>> compacted, the compaction will be reverted. In the worst case, this could >>>> lead to a situation where compaction never successfully completes. >>>> >>>> > My biggest concern is external compaction can succeed and the map in >>>> flink job silently becomes stale, leading to data correctness issues. >>>> What's your take on this? Given what you mentioned, it seems this is not a >>>> concern because one of both must fail? >>>> >>>> This scenario could be addressed with retries, since compactions are >>>> expected to be relatively infrequent. >>>> >>>> >>>> >>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 27., K, 2:00): >>>> >>>>> Hi, Péter >>>>> >>>>> > A concurrently running external compaction could cause an >>>>> equality-delete compaction to fail and retry, or an equality-delete >>>>> compaction could cause an external compaction to fail if it attempts to >>>>> compact files affected by newly introduced deletes. >>>>> >>>>> It seems we can still allow external compaction to some extent? >>>>> Following this approach, the flink job will maintain PK → (filename, >>>>> position). My biggest concern is external compaction can succeed and the >>>>> map in flink job silently becomes stale, leading to data correctness >>>>> issues. What's your take on this? Given what you mentioned, it seems this >>>>> is not a concern because one of both must fail? >>>>> >>>>> Best >>>>> Lu >>>>> >>>>> On Sat, Jan 24, 2026 at 7:50 AM Péter Váry < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Lu, >>>>>> >>>>>> > Regarding the TableMaintenance you mentioned—does this effectively >>>>>> run compaction as part of the Flink job? If so, does that mean the >>>>>> execution flow becomes serialized like: >>>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit >>>>>> new snapshot) -> compaction -> ...? >>>>>> >>>>>> Yes. This can be partially mitigated by avoiding running equality >>>>>> delete compaction on every commit. Note that this is not a full >>>>>> compaction, >>>>>> but only an equality-delete rewrite. >>>>>> Longer term, we could implement the approach proposed by Steven, >>>>>> where equality deletes are not immediately committed to the table but are >>>>>> instead kept in Flink state, and only committed once the equality-delete >>>>>> rewrite has completed. >>>>>> >>>>>> > Under this model, is compaction within the Flink job sufficient on >>>>>> its own? Today, external compaction helps rewrite historical files to be >>>>>> more reader-optimized while the Flink job continues to commit the newest >>>>>> data. I’m wondering whether we can achieve the same effect with >>>>>> compaction >>>>>> running only inside the Flink job. >>>>>> >>>>>> Currently, Flink only supports BinPack compaction. The preferred >>>>>> solution would be to implement shuffle compaction directly in Flink. >>>>>> >>>>>> As an alternative, we could tolerate occasional compaction failures. >>>>>> A concurrently running external compaction could cause an equality-delete >>>>>> compaction to fail and retry, or an equality-delete compaction could >>>>>> cause >>>>>> an external compaction to fail if it attempts to compact files affected >>>>>> by >>>>>> newly introduced deletes. If equality-delete compactions run frequently, >>>>>> this may lead to starvation on the external compaction side. For this >>>>>> reason, integrating compaction into the Flink jobs themselves appears to >>>>>> be >>>>>> the more robust overall solution. >>>>>> >>>>>> > Also, what's the main challenge to simplify the index design? I’m >>>>>> not an expert here, but similar open table formats like Paimon or Hudi >>>>>> both >>>>>> support primary key indexes. Is it possible to borrow some ideas or >>>>>> patterns from those systems? >>>>>> >>>>>> The challenge is not with the index design itself, but with the >>>>>> underlying table layout. To the best of my knowledge, both Paimon and >>>>>> Hudi >>>>>> impose layout constraints by bucketing rows based on the primary key. As >>>>>> a >>>>>> result, if a reader‑optimized layout is required and queries need to >>>>>> efficiently access data by columns other than the primary key, Iceberg >>>>>> tables are generally a better fit, as they do not impose such layout >>>>>> restrictions. >>>>>> >>>>>> I hope this helps, >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> >>>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 23., P, >>>>>> 18:09): >>>>>> >>>>>>> Hi, Péter >>>>>>> >>>>>>> Thanks for the reply! >>>>>>> >>>>>>> 1. >>>>>>> > TableMaintenance >>>>>>> Regarding the TableMaintenance you mentioned—does this effectively >>>>>>> run compaction as part of the Flink job? If so, does that mean the >>>>>>> execution flow becomes serialized like: >>>>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit >>>>>>> new snapshot) -> compaction -> ...? >>>>>>> >>>>>>> 2. >>>>>>> > Our conclusion was that, as a general rule, users should avoid >>>>>>> running compaction outside of the Flink job >>>>>>> Under this model, is compaction within the Flink job sufficient on >>>>>>> its own? Today, external compaction helps rewrite historical files to be >>>>>>> more reader-optimized while the Flink job continues to commit the newest >>>>>>> data. I’m wondering whether we can achieve the same effect with >>>>>>> compaction >>>>>>> running only inside the Flink job. >>>>>>> >>>>>>> 3. >>>>>>> Also, what's the main challenge to simplify the index design? I’m >>>>>>> not an expert here, but similar open table formats like Paimon or Hudi >>>>>>> both >>>>>>> support primary key indexes. Is it possible to borrow some ideas or >>>>>>> patterns from those systems? >>>>>>> >>>>>>> Best >>>>>>> Lu >>>>>>> >>>>>>> >>>>>>> On Fri, Jan 23, 2026 at 3:37 AM Péter Váry < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Excellent questions Lu, >>>>>>>> >>>>>>>> > 1. PK → (filename, position) map in memory => are you referring >>>>>>>> to Flink state? If so, are we concerned that its size will grow without >>>>>>>> bound as the primary key cardinality increases? >>>>>>>> >>>>>>>> We could store this mapping in Flink state, but restoring it may >>>>>>>> introduce more complexity than recomputing it by re‑reading `SELECT PK, >>>>>>>> filename, position` on each restart, especially since the table may >>>>>>>> change >>>>>>>> while the job is stopped. My initial thought would be to persist the >>>>>>>> data >>>>>>>> in RocksDB during execution but recompute it on restart rather than >>>>>>>> restoring state. >>>>>>>> >>>>>>>> > 2. How do we handle the fact that this map can become stale when >>>>>>>> external compaction occurs? >>>>>>>> >>>>>>>> We discussed this with Steven when working on the proposal. Our >>>>>>>> conclusion was that, as a general rule, users should avoid running >>>>>>>> compaction outside of the Flink job. In practice, we could also >>>>>>>> introduce >>>>>>>> validators that prevent committing equality‑delete to position‑delete >>>>>>>> conversions when a concurrent compaction is detected. >>>>>>>> >>>>>>>> >>>>>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 22., Cs, >>>>>>>> 19:27): >>>>>>>> >>>>>>>>> Hi, Péter >>>>>>>>> >>>>>>>>> Thanks for the reply! I have some high level questions about the >>>>>>>>> idea: >>>>>>>>> >>>>>>>>> 1. PK → (filename, position) map in memory => are you referring to >>>>>>>>> Flink state? If so, are we concerned that its size will grow without >>>>>>>>> bound >>>>>>>>> as the primary key cardinality increases? >>>>>>>>> 2. How do we handle the fact that this map can become stale when >>>>>>>>> external compaction occurs? >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Lu >>>>>>>>> >>>>>>>>> On Thu, Jan 22, 2026 at 12:03 AM Péter Váry < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Lu, >>>>>>>>>> >>>>>>>>>> Steven and I have other priorities at the moment, so please feel >>>>>>>>>> free to pick up any loose threads here. >>>>>>>>>> >>>>>>>>>> We gained quite a lot by relaxing some of the requirements from >>>>>>>>>> the original proposal. If we accept that equality deletes remain, but >>>>>>>>>> compacted very soon, we could get at an even more limited change >>>>>>>>>> that still >>>>>>>>>> helps your use case and points in the right overall direction. >>>>>>>>>> >>>>>>>>>> My initial proposal would be: >>>>>>>>>> >>>>>>>>>> - First, ensure that Guo’s PR enabling Flink TableMaintenance >>>>>>>>>> without a LockManager gets merged: >>>>>>>>>> https://github.com/apache/iceberg/pull/15042 >>>>>>>>>> - Introduce a TableMaintenance task that handles newly >>>>>>>>>> written equality deletes: >>>>>>>>>> - Maintain a PK → (filename, position) map in memory, >>>>>>>>>> distributed across the TaskManagers. >>>>>>>>>> - In the absence of indexes for now, this map can be >>>>>>>>>> reconstructed on every job start. >>>>>>>>>> - Update the map whenever new table changes are committed. >>>>>>>>>> - Convert PK-based equality deletes into position deletes. >>>>>>>>>> - Place this task at the end of the streaming ingest pipeline. >>>>>>>>>> >>>>>>>>>> This approach builds entirely on existing components and can >>>>>>>>>> later be enhanced with proper index support. If we decide to >>>>>>>>>> refactor the >>>>>>>>>> ingest stream, we could even avoid committing equality deletes to >>>>>>>>>> the table >>>>>>>>>> altogether and remove the need for them in the Flink jobs. >>>>>>>>>> >>>>>>>>>> I don’t have the bandwidth to push this forward myself, but I’d >>>>>>>>>> be very happy to help with proposal and code reviews. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Peter >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jan 21, 2026, 18:48 Lu Niu <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you for all your replies! >>>>>>>>>>> >>>>>>>>>>> > Some people use the current Flink Iceberg sink for CDC >>>>>>>>>>> ingestion. But it would produce equality deletes that would require >>>>>>>>>>> aggressive compactions and add operational burden too >>>>>>>>>>> >>>>>>>>>>> Since the main concern is reader-side performance degradation >>>>>>>>>>> due to the accumulation of equality deletes over time. Is there a >>>>>>>>>>> way to >>>>>>>>>>> estimate impact on the reader side based on equality deletes in a >>>>>>>>>>> snapshot >>>>>>>>>>> summary? >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> while new_snapshot_summary_is_ready: >>>>>>>>>>> should_compact = analyze_snapshot_summary(snapshot_summary) >>>>>>>>>>> if should_compact: >>>>>>>>>>> rewrite_data_files() >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> > The original design has high complexity. We were thinking >>>>>>>>>>> about alternatives with narrower scope. But there isn't any >>>>>>>>>>> progress and >>>>>>>>>>> timeline . >>>>>>>>>>> >>>>>>>>>>> If this is the community aligned long term, Is there any way I >>>>>>>>>>> could contribute to speed this up? Thanks! >>>>>>>>>>> >>>>>>>>>>> Best >>>>>>>>>>> Lu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 21, 2026 at 2:18 AM Maximilian Michels < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Lu, >>>>>>>>>>>> >>>>>>>>>>>> Just to iterate the status quo: Flink supports upserts, but >>>>>>>>>>>> only via >>>>>>>>>>>> equality delete + append. So technically, "streaming writes" >>>>>>>>>>>> aren't an >>>>>>>>>>>> issue. It's the read path which causes the issue, because unlike >>>>>>>>>>>> positional deletes, which can be resolved on the fly during >>>>>>>>>>>> streaming >>>>>>>>>>>> reads, equality deletes potentially require a full table scan >>>>>>>>>>>> to be >>>>>>>>>>>> materialized. Constant snapshot compaction is required to keep >>>>>>>>>>>> the >>>>>>>>>>>> read path efficient. >>>>>>>>>>>> >>>>>>>>>>>> >1. A Flink job that continuously appends CDC events into an >>>>>>>>>>>> append-only raw table >>>>>>>>>>>> >2. A periodically scheduled Spark job that performs upsert the >>>>>>>>>>>> `current` table using `raw` table >>>>>>>>>>>> >>>>>>>>>>>> This makes sense. Conceptually, you are pre-compacting upserts >>>>>>>>>>>> before >>>>>>>>>>>> writing into the final "current" table. This avoids equality >>>>>>>>>>>> deletes >>>>>>>>>>>> entirely and keeps the read path on the "current" table >>>>>>>>>>>> efficient at >>>>>>>>>>>> all times. The drawback is that your lower bound latency will >>>>>>>>>>>> be the >>>>>>>>>>>> interval at which the Spark job runs, but this is an acceptable >>>>>>>>>>>> price >>>>>>>>>>>> to pay, until we have a way to write positional deletes right >>>>>>>>>>>> away, >>>>>>>>>>>> avoiding equality deletes entirely. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Max >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 21, 2026 at 8:48 AM melin li < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Flink CDC support reading binlog data from databases such as >>>>>>>>>>>> MySQL and PostgreSQL, and writing it to Iceberg, Hudi, and Paimon. >>>>>>>>>>>> > https://github.com/apache/flink-cdc/pulls?q=iceberg >>>>>>>>>>>> > >>>>>>>>>>>> > Steven Wu <[email protected]> 于2026年1月21日周三 15:27写道: >>>>>>>>>>>> >> >>>>>>>>>>>> >> Lu, >>>>>>>>>>>> >> >>>>>>>>>>>> >> you are correct about the design doc for Flink writing >>>>>>>>>>>> position deletes only. The original design has high complexity. We >>>>>>>>>>>> were >>>>>>>>>>>> thinking about alternatives with narrower scope. But there isn't >>>>>>>>>>>> any >>>>>>>>>>>> progress and timeline . >>>>>>>>>>>> >> >>>>>>>>>>>> >> IMHO, your setup is a good practice today. Ryan wrote a >>>>>>>>>>>> series of blogs for the pattern: >>>>>>>>>>>> https://tabular.medium.com/hello-world-of-cdc-e6f06ddbfcc0. >>>>>>>>>>>> >> >>>>>>>>>>>> >> Some people use the current Flink Iceberg sink for CDC >>>>>>>>>>>> ingestion. But it would produce equality deletes that would require >>>>>>>>>>>> aggressive compactions and add operational burden too. Also not >>>>>>>>>>>> all engines >>>>>>>>>>>> can read equality deletes. >>>>>>>>>>>> >> >>>>>>>>>>>> >> Thanks, >>>>>>>>>>>> >> Steven >>>>>>>>>>>> >> >>>>>>>>>>>> >> On Tue, Jan 20, 2026 at 8:44 PM Gang Wu <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Hi Lu, >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Nice to hear from you here in the Iceberg community :) >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> We have built an internal service to stream upserts into >>>>>>>>>>>> position deletes which happens to have a lot in common with [1] >>>>>>>>>>>> and [2]. I >>>>>>>>>>>> believe this is a viable approach to achieve second freshness. >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> [1] >>>>>>>>>>>> https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk >>>>>>>>>>>> >>> [2] https://www.mooncake.dev/whitepaper >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> Best, >>>>>>>>>>>> >>> Gang >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> On Wed, Jan 21, 2026 at 11:05 AM Lu Niu <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Hi Iceberg community, >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> What are the current best practices for streaming upserts >>>>>>>>>>>> into an Iceberg table? >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Today, we have the following setup in production to >>>>>>>>>>>> support CDC: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> 1. A Flink job that continuously appends CDC events into >>>>>>>>>>>> an append-only raw table >>>>>>>>>>>> >>>> 2, A periodically scheduled Spark job that performs upsert >>>>>>>>>>>> the `current` table using `raw` table >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> We are exploring whether it’s feasible to stream upserts >>>>>>>>>>>> directly into an Iceberg table from Flink. This could simplify our >>>>>>>>>>>> architecture and potentially further reduce our data SLA. We’ve >>>>>>>>>>>> experimented with this approach before, but ran into reader-side >>>>>>>>>>>> performance issues due to the accumulation of equality deletes >>>>>>>>>>>> over time. >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> From what I can gather, streaming upserts still seems to >>>>>>>>>>>> be an open design area: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> 1. (Please correct me if I’m wrong—this summary is partly >>>>>>>>>>>> based on ChatGPT 5.1.) The book “Apache Iceberg: The Definitive >>>>>>>>>>>> Guide” >>>>>>>>>>>> suggests the two-table pattern we’re currently using in production. >>>>>>>>>>>> >>>> 2. These threads: >>>>>>>>>>>> https://lists.apache.org/thread/gjjr30txq318qp6pff3x5fx1jmdnr6fv >>>>>>>>>>>> , >>>>>>>>>>>> https://lists.apache.org/thread/xdkzllzt4p3tvcd3ft4t7jsvyvztr41j >>>>>>>>>>>> discuss the idea of outputting only positional deletes (no equality >>>>>>>>>>>> deletes) by introducing an index. However, this appears to still >>>>>>>>>>>> be under >>>>>>>>>>>> discussion and may be targeted for v4, with no concrete timeline >>>>>>>>>>>> yet. >>>>>>>>>>>> >>>> 3. this thread >>>>>>>>>>>> https://lists.apache.org/thread/6fhpjszsfxd8p0vfzc3k5vw7zmcyv2mq >>>>>>>>>>>> talks about deprecating equality deletes, but I haven’t seen a >>>>>>>>>>>> clearly >>>>>>>>>>>> defined alternative come out of that discussion yet. >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Given all of the above, I’d really appreciate guidance >>>>>>>>>>>> from the community on: >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> 1. Recommended patterns for streaming upserts with Flink >>>>>>>>>>>> into Iceberg today (it's good to know the long term possible as >>>>>>>>>>>> well, but >>>>>>>>>>>> my focus is what's possible in near term). >>>>>>>>>>>> >>>> 2. Practical experiences or lessons learned from teams >>>>>>>>>>>> running streaming upserts in production >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Thanks in advance for any insights and corrections. >>>>>>>>>>>> >>>> >>>>>>>>>>>> >>>> Best >>>>>>>>>>>> >>>> Lu >>>>>>>>>>>> >>>>>>>>>>>
