Hi, Péter

Thanks! I summarized our discussion so far and also adds some thoughts in
https://docs.google.com/document/d/1srfQWyomX5HanwJkAr5qEAe7lLPzE9b-qv46L9vIXA4/edit?tab=t.0#heading=h.313stzg3g3hr.
Could you help review it when you have a chance? Comments and suggestions
are very welcome.

Best
Lu

On Wed, Jan 28, 2026 at 3:05 AM Péter Váry <[email protected]>
wrote:

> > Current Flink committer does not call validateDataFilesExist(), which
> means if the commit order is compaction succeeds first then Flink commit
> can succeed with dangling position deletes.
>
> The current Flink committer does not generate position delete files for
> existing data files, so no check is required today.
>
> If we follow a *“commit → compact equality deletes to position deletes”*
> flow, the check is still not needed during commit, but it becomes necessary
> during compaction.
>
> If we instead choose a *“compact equality deletes to position deletes →
> commit”* flow, then the check must be performed as part of the commit.
>
> With exactly-once sources and sinks, this can largely be mitigated through
> Flink job restarts. If that is not sufficient, we could also introduce
> retry logic into the equality-to-position delete compaction.
>
> That said, I believe the larger issue is that external compaction will not
> be able to commit changes unless the Flink job is stopped, or the scope of
> concurrent updates is constrained such that they do not interfere with the
> compaction process.
>
> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 28., Sze, 1:22):
>
>> Hi,  Péter
>>
>> > This scenario could be addressed with retries, since compactions are
>> expected to be relatively infrequent.
>>
>> Sorry to reiterate on this point. Essentially the question is:
>>
>> Assume flink job will maintain PK → (filename, position) then we can have
>> 2 concurrent writers:
>>
>> 1. a flink job writes a position delete pointing to an old data fie F1
>>
>> 2. a spark compaction job which deletes F1
>>
>> Current Flink committer does not call validateDataFilesExist(), which
>> means if the commit order is compaction succeeds first then Flink commit
>> can succeed with dangling position deletes.
>>
>>
>> Is my understanding correct? Or is the problem supposed to be solved by
>> the TableMaintenance PR you mentioned?
>>
>> Best
>>
>> Lu
>>
>>
>>
>>
>> On Tue, Jan 27, 2026 at 2:42 AM Péter Váry <[email protected]>
>> wrote:
>>
>>> > It seems we can still allow external compaction to some extent?
>>>
>>> The challenge is that streaming writes are significantly more frequent
>>> than regular commits. If a streaming write produces a delete file, it is
>>> very likely to conflict with any ongoing compaction.
>>>
>>> Consider a Flink job with a 2‑minute checkpoint interval, where every
>>> commit contains some deletes. A compaction typically takes longer than 2
>>> minutes, so during its execution at least one Flink commit will update the
>>> table. If that commit includes a delete for a file currently being
>>> compacted, the compaction will be reverted. In the worst case, this could
>>> lead to a situation where compaction never successfully completes.
>>>
>>> > My biggest concern is external compaction can succeed and the map in
>>> flink job silently becomes stale, leading to data correctness issues.
>>> What's your take on this? Given what you mentioned, it seems this is not a
>>> concern because one of both must fail?
>>>
>>> This scenario could be addressed with retries, since compactions are
>>> expected to be relatively infrequent.
>>>
>>>
>>>
>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 27., K, 2:00):
>>>
>>>> Hi, Péter
>>>>
>>>> > A concurrently running external compaction could cause an
>>>> equality-delete compaction to fail and retry, or an equality-delete
>>>> compaction could cause an external compaction to fail if it attempts to
>>>> compact files affected by newly introduced deletes.
>>>>
>>>> It seems we can still allow external compaction to some extent?
>>>> Following this approach, the flink job will maintain PK → (filename,
>>>> position). My biggest concern is external compaction can succeed and the
>>>> map in flink job silently becomes stale, leading to data correctness
>>>> issues. What's your take on this? Given what you mentioned, it seems this
>>>> is not a concern because one of both must fail?
>>>>
>>>> Best
>>>> Lu
>>>>
>>>> On Sat, Jan 24, 2026 at 7:50 AM Péter Váry <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Lu,
>>>>>
>>>>> > Regarding the TableMaintenance you mentioned—does this effectively
>>>>> run compaction as part of the Flink job? If so, does that mean the
>>>>> execution flow becomes serialized like:
>>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit
>>>>> new snapshot) -> compaction -> ...?
>>>>>
>>>>> Yes. This can be partially mitigated by avoiding running equality
>>>>> delete compaction on every commit. Note that this is not a full 
>>>>> compaction,
>>>>> but only an equality-delete rewrite.
>>>>> Longer term, we could implement the approach proposed by Steven, where
>>>>> equality deletes are not immediately committed to the table but are 
>>>>> instead
>>>>> kept in Flink state, and only committed once the equality-delete rewrite
>>>>> has completed.
>>>>>
>>>>> > Under this model, is compaction within the Flink job sufficient on
>>>>> its own? Today, external compaction helps rewrite historical files to be
>>>>> more reader-optimized while the Flink job continues to commit the newest
>>>>> data. I’m wondering whether we can achieve the same effect with compaction
>>>>> running only inside the Flink job.
>>>>>
>>>>> Currently, Flink only supports BinPack compaction. The preferred
>>>>> solution would be to implement shuffle compaction directly in Flink.
>>>>>
>>>>> As an alternative, we could tolerate occasional compaction failures. A
>>>>> concurrently running external compaction could cause an equality-delete
>>>>> compaction to fail and retry, or an equality-delete compaction could cause
>>>>> an external compaction to fail if it attempts to compact files affected by
>>>>> newly introduced deletes. If equality-delete compactions run frequently,
>>>>> this may lead to starvation on the external compaction side. For this
>>>>> reason, integrating compaction into the Flink jobs themselves appears to 
>>>>> be
>>>>> the more robust overall solution.
>>>>>
>>>>> > Also, what's the main challenge to simplify the index design?  I’m
>>>>> not an expert here, but similar open table formats like Paimon or Hudi 
>>>>> both
>>>>> support primary key indexes. Is it possible to borrow some ideas or
>>>>> patterns from those systems?
>>>>>
>>>>> The challenge is not with the index design itself, but with the
>>>>> underlying table layout. To the best of my knowledge, both Paimon and Hudi
>>>>> impose layout constraints by bucketing rows based on the primary key. As a
>>>>> result, if a reader‑optimized layout is required and queries need to
>>>>> efficiently access data by columns other than the primary key, Iceberg
>>>>> tables are generally a better fit, as they do not impose such layout
>>>>> restrictions.
>>>>>
>>>>> I hope this helps,
>>>>> Thanks,
>>>>> Peter
>>>>>
>>>>>
>>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 23., P,
>>>>> 18:09):
>>>>>
>>>>>> Hi, Péter
>>>>>>
>>>>>> Thanks for the reply!
>>>>>>
>>>>>> 1.
>>>>>> > TableMaintenance
>>>>>> Regarding the TableMaintenance you mentioned—does this effectively
>>>>>> run compaction as part of the Flink job? If so, does that mean the
>>>>>> execution flow becomes serialized like:
>>>>>> checkpoint (commit new snapshot) -> compaction -> checkpoint (commit
>>>>>> new snapshot) -> compaction -> ...?
>>>>>>
>>>>>> 2.
>>>>>> >  Our conclusion was that, as a general rule, users should avoid
>>>>>> running compaction outside of the Flink job
>>>>>> Under this model, is compaction within the Flink job sufficient on
>>>>>> its own? Today, external compaction helps rewrite historical files to be
>>>>>> more reader-optimized while the Flink job continues to commit the newest
>>>>>> data. I’m wondering whether we can achieve the same effect with 
>>>>>> compaction
>>>>>> running only inside the Flink job.
>>>>>>
>>>>>> 3.
>>>>>> Also, what's the main challenge to simplify the index design?  I’m
>>>>>> not an expert here, but similar open table formats like Paimon or Hudi 
>>>>>> both
>>>>>> support primary key indexes. Is it possible to borrow some ideas or
>>>>>> patterns from those systems?
>>>>>>
>>>>>> Best
>>>>>> Lu
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 23, 2026 at 3:37 AM Péter Váry <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Excellent questions Lu,
>>>>>>>
>>>>>>> > 1. PK → (filename, position) map in memory => are you referring to
>>>>>>> Flink state? If so, are we concerned that its size will grow without 
>>>>>>> bound
>>>>>>> as the primary key cardinality increases?
>>>>>>>
>>>>>>> We could store this mapping in Flink state, but restoring it may
>>>>>>> introduce more complexity than recomputing it by re‑reading `SELECT PK,
>>>>>>> filename, position` on each restart, especially since the table may 
>>>>>>> change
>>>>>>> while the job is stopped. My initial thought would be to persist the 
>>>>>>> data
>>>>>>> in RocksDB during execution but recompute it on restart rather than
>>>>>>> restoring state.
>>>>>>>
>>>>>>> > 2. How do we handle the fact that this map can become stale when
>>>>>>> external compaction occurs?
>>>>>>>
>>>>>>> We discussed this with Steven when working on the proposal. Our
>>>>>>> conclusion was that, as a general rule, users should avoid running
>>>>>>> compaction outside of the Flink job. In practice, we could also 
>>>>>>> introduce
>>>>>>> validators that prevent committing equality‑delete to position‑delete
>>>>>>> conversions when a concurrent compaction is detected.
>>>>>>>
>>>>>>>
>>>>>>> Lu Niu <[email protected]> ezt írta (időpont: 2026. jan. 22., Cs,
>>>>>>> 19:27):
>>>>>>>
>>>>>>>> Hi, Péter
>>>>>>>>
>>>>>>>> Thanks for the reply!  I have some high level questions about the
>>>>>>>> idea:
>>>>>>>>
>>>>>>>> 1. PK → (filename, position) map in memory => are you referring to
>>>>>>>> Flink state? If so, are we concerned that its size will grow without 
>>>>>>>> bound
>>>>>>>> as the primary key cardinality increases?
>>>>>>>> 2. How do we handle the fact that this map can become stale when
>>>>>>>> external compaction occurs?
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Lu
>>>>>>>>
>>>>>>>> On Thu, Jan 22, 2026 at 12:03 AM Péter Váry <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Lu,
>>>>>>>>>
>>>>>>>>> Steven and I have other priorities at the moment, so please feel
>>>>>>>>> free to pick up any loose threads here.
>>>>>>>>>
>>>>>>>>> We gained quite a lot by relaxing some of the requirements from
>>>>>>>>> the original proposal. If we accept that equality deletes remain, but
>>>>>>>>> compacted very soon, we could get at an even more limited change that 
>>>>>>>>> still
>>>>>>>>> helps your use case and points in the right overall direction.
>>>>>>>>>
>>>>>>>>> My initial proposal would be:
>>>>>>>>>
>>>>>>>>>    - First, ensure that Guo’s PR enabling Flink TableMaintenance
>>>>>>>>>    without a LockManager gets merged:
>>>>>>>>>    https://github.com/apache/iceberg/pull/15042
>>>>>>>>>    - Introduce a TableMaintenance task that handles newly written
>>>>>>>>>    equality deletes:
>>>>>>>>>       - Maintain a PK → (filename, position) map in memory,
>>>>>>>>>       distributed across the TaskManagers.
>>>>>>>>>       - In the absence of indexes for now, this map can be
>>>>>>>>>       reconstructed on every job start.
>>>>>>>>>       - Update the map whenever new table changes are committed.
>>>>>>>>>       - Convert PK-based equality deletes into position deletes.
>>>>>>>>>    - Place this task at the end of the streaming ingest pipeline.
>>>>>>>>>
>>>>>>>>> This approach builds entirely on existing components and can later
>>>>>>>>> be enhanced with proper index support. If we decide to refactor the 
>>>>>>>>> ingest
>>>>>>>>> stream, we could even avoid committing equality deletes to the table
>>>>>>>>> altogether and remove the need for them in the Flink jobs.
>>>>>>>>>
>>>>>>>>> I don’t have the bandwidth to push this forward myself, but I’d be
>>>>>>>>> very happy to help with proposal and code reviews.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Peter
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 21, 2026, 18:48 Lu Niu <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you for all your replies!
>>>>>>>>>>
>>>>>>>>>> > Some people use the current Flink Iceberg sink for CDC
>>>>>>>>>> ingestion. But it would produce equality deletes that would require
>>>>>>>>>> aggressive compactions and add operational burden too
>>>>>>>>>>
>>>>>>>>>> Since the main concern is reader-side performance degradation due
>>>>>>>>>> to the accumulation of equality deletes over time. Is there a way to
>>>>>>>>>> estimate impact on the reader side based on equality deletes in a 
>>>>>>>>>> snapshot
>>>>>>>>>> summary?
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> while new_snapshot_summary_is_ready:
>>>>>>>>>>     should_compact = analyze_snapshot_summary(snapshot_summary)
>>>>>>>>>>     if should_compact:
>>>>>>>>>>         rewrite_data_files()
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> > The original design has high complexity. We were thinking about
>>>>>>>>>> alternatives with narrower scope. But there isn't any progress and 
>>>>>>>>>> timeline
>>>>>>>>>> .
>>>>>>>>>>
>>>>>>>>>> If this is the community aligned long term,  Is there any way I
>>>>>>>>>> could contribute to speed this up? Thanks!
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Lu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 21, 2026 at 2:18 AM Maximilian Michels <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Lu,
>>>>>>>>>>>
>>>>>>>>>>> Just to iterate the status quo: Flink supports upserts, but only
>>>>>>>>>>> via
>>>>>>>>>>> equality delete + append. So technically, "streaming writes"
>>>>>>>>>>> aren't an
>>>>>>>>>>> issue. It's the read path which causes the issue, because unlike
>>>>>>>>>>> positional deletes, which can be resolved on the fly during
>>>>>>>>>>> streaming
>>>>>>>>>>> reads, equality deletes potentially require a full table scan to
>>>>>>>>>>> be
>>>>>>>>>>> materialized. Constant snapshot compaction is required to keep
>>>>>>>>>>> the
>>>>>>>>>>> read path efficient.
>>>>>>>>>>>
>>>>>>>>>>> >1. A Flink job that continuously appends CDC events into an
>>>>>>>>>>> append-only raw table
>>>>>>>>>>> >2. A periodically scheduled Spark job that performs upsert the
>>>>>>>>>>> `current` table using `raw` table
>>>>>>>>>>>
>>>>>>>>>>> This makes sense. Conceptually, you are pre-compacting upserts
>>>>>>>>>>> before
>>>>>>>>>>> writing into the final "current" table. This avoids equality
>>>>>>>>>>> deletes
>>>>>>>>>>> entirely and keeps the read path on the "current" table
>>>>>>>>>>> efficient at
>>>>>>>>>>> all times. The drawback is that your lower bound latency will be
>>>>>>>>>>> the
>>>>>>>>>>> interval at which the Spark job runs, but this is an acceptable
>>>>>>>>>>> price
>>>>>>>>>>> to pay, until we have a way to write positional deletes right
>>>>>>>>>>> away,
>>>>>>>>>>> avoiding equality deletes entirely.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Max
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 21, 2026 at 8:48 AM melin li <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Flink CDC support reading binlog data from databases such as
>>>>>>>>>>> MySQL and PostgreSQL, and writing it to Iceberg, Hudi, and Paimon.
>>>>>>>>>>> > https://github.com/apache/flink-cdc/pulls?q=iceberg
>>>>>>>>>>> >
>>>>>>>>>>> > Steven Wu <[email protected]> 于2026年1月21日周三 15:27写道:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Lu,
>>>>>>>>>>> >>
>>>>>>>>>>> >> you are correct about the design doc for Flink writing
>>>>>>>>>>> position deletes only. The original design has high complexity. We 
>>>>>>>>>>> were
>>>>>>>>>>> thinking about alternatives with narrower scope. But there isn't any
>>>>>>>>>>> progress and timeline .
>>>>>>>>>>> >>
>>>>>>>>>>> >> IMHO, your setup is a good practice today. Ryan wrote a
>>>>>>>>>>> series of blogs for the pattern:
>>>>>>>>>>> https://tabular.medium.com/hello-world-of-cdc-e6f06ddbfcc0.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Some people use the current Flink Iceberg sink for CDC
>>>>>>>>>>> ingestion. But it would produce equality deletes that would require
>>>>>>>>>>> aggressive compactions and add operational burden too. Also not all 
>>>>>>>>>>> engines
>>>>>>>>>>> can read equality deletes.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Thanks,
>>>>>>>>>>> >> Steven
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Tue, Jan 20, 2026 at 8:44 PM Gang Wu <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Hi Lu,
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Nice to hear from you here in the Iceberg community :)
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> We have built an internal service to stream upserts into
>>>>>>>>>>> position deletes which happens to have a lot in common with [1] and 
>>>>>>>>>>> [2]. I
>>>>>>>>>>> believe this is a viable approach to achieve second freshness.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> [1]
>>>>>>>>>>> https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk
>>>>>>>>>>> >>> [2] https://www.mooncake.dev/whitepaper
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Best,
>>>>>>>>>>> >>> Gang
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Wed, Jan 21, 2026 at 11:05 AM Lu Niu <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Hi Iceberg community,
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> What are the current best practices for streaming upserts
>>>>>>>>>>> into an Iceberg table?
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Today, we have the following setup in production to support
>>>>>>>>>>> CDC:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> 1. A Flink job that continuously appends CDC events into an
>>>>>>>>>>> append-only raw table
>>>>>>>>>>> >>>> 2, A periodically scheduled Spark job that performs upsert
>>>>>>>>>>> the `current` table using `raw` table
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> We are exploring whether it’s feasible to stream upserts
>>>>>>>>>>> directly into an Iceberg table from Flink. This could simplify our
>>>>>>>>>>> architecture and potentially further reduce our data SLA. We’ve
>>>>>>>>>>> experimented with this approach before, but ran into reader-side
>>>>>>>>>>> performance issues due to the accumulation of equality deletes over 
>>>>>>>>>>> time.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> From what I can gather, streaming upserts still seems to be
>>>>>>>>>>> an open design area:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> 1. (Please correct me if I’m wrong—this summary is partly
>>>>>>>>>>> based on ChatGPT 5.1.) The book “Apache Iceberg: The Definitive 
>>>>>>>>>>> Guide”
>>>>>>>>>>> suggests the two-table pattern we’re currently using in production.
>>>>>>>>>>> >>>> 2.  These threads:
>>>>>>>>>>> https://lists.apache.org/thread/gjjr30txq318qp6pff3x5fx1jmdnr6fv
>>>>>>>>>>> ,
>>>>>>>>>>> https://lists.apache.org/thread/xdkzllzt4p3tvcd3ft4t7jsvyvztr41j
>>>>>>>>>>> discuss the idea of outputting only positional deletes (no equality
>>>>>>>>>>> deletes) by introducing an index. However, this appears to still be 
>>>>>>>>>>> under
>>>>>>>>>>> discussion and may be targeted for v4, with no concrete timeline 
>>>>>>>>>>> yet.
>>>>>>>>>>> >>>> 3. this thread
>>>>>>>>>>> https://lists.apache.org/thread/6fhpjszsfxd8p0vfzc3k5vw7zmcyv2mq
>>>>>>>>>>> talks about deprecating equality deletes, but I haven’t seen a 
>>>>>>>>>>> clearly
>>>>>>>>>>> defined alternative come out of that discussion yet.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Given all of the above, I’d really appreciate guidance from
>>>>>>>>>>> the community on:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> 1. Recommended patterns for streaming upserts with Flink
>>>>>>>>>>> into Iceberg today (it's good to know the long term possible as 
>>>>>>>>>>> well, but
>>>>>>>>>>> my focus is what's possible in near term).
>>>>>>>>>>> >>>> 2. Practical experiences or lessons learned from teams
>>>>>>>>>>> running streaming upserts in production
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Thanks in advance for any insights and corrections.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Best
>>>>>>>>>>> >>>> Lu
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to