Hey Xiaoxuan,

Want to bring up a couple use cases that might be related to your proposal,
but you can tell me whether they are relevant or not from your perspective.

Use Case 1: Ranking

A user has a table with N+1 columns, the first N columns are the base data
(events she gathered), the last column is a ranking number for each row.
The ranking is defined within the partition, so the ranking column for all
rows are initially NaN, and will be updated when the entire (or close to
the entire) partition arrives at a later point.

UPDATE tbl SET my_rank = rank_func(col_i, col_j, col_k …) WHERE pk_col_1 =
pk1, pk_col_2 = pk2 …

The above statement is just saying: now that my entire partition has been
ingested, please calculate all the rankings for all the rows in this
partition and update onto the table.

I think the best way to visualize this use case is to think of a typical
social media company,  who needs to constantly generate the top N feed/news
from the previous day, the past hour, etc. Let’s just say my customer has a
use case similar in nature (more or less a typical streaming with fixed
window use case).

In this case, there are downstream consumers of my table. For example, with
ranking info populated, downstream consumers want to further analyze if
there is something in common for events that have high ranks across
partitions. So query speed on this table matters. And let’s just assume we
need to keep up the query performance on this table no matter how
frequently we are updating ranking info into this table.

Now, the restriction here is I cannot “copy on write”, because the base
event data (the first N columns) is too big, “copy on write” would simply
be unaffordable. I cannot update my ranking info into my table using
“positional deletes”, because writing “positional deletes” is slower than
writing “equality deletes”, making my downstream consumers waiting longer
to obtain ranking info for the latest partition. Let’s just say my
downstream consumers want ranking info to be presented to them in a timely
manner.

Additional Notes: I guess many folks here might suggest not putting ranking
info and the base event data into the same table. Instead, keep the base
event data in a base table, and then create a view (or better materialized
view) to store base event data + ranking info together. This is more or
less my customer’s decision not mine, they do not want to keep a base
table, and a view (materialized view) at the same time, because, from their
perspective, the ops load for maintaining a base table + a view is
significantly higher than maintaining a single table. Also, the ranking
here is not "write-once", there is a chance that the ranking will be
recalculated for an old partition with ranking already calculated. It might
be because the ranking_func/rule changed, or there's an error in previously
calculated rank (i.e. due to extremely long latency for some data on
ingestion side, the partition is not complete enough to derive the correct
ranking info previously).

Use Case 2: Retention

Now, if every Iceberg table in the world uses a time based partition, then
running a retention workflow is as easy as just dropping partitions as time
comes. Unfortunately that assumption is not true. So in case of removing
outdated rows on an Iceberg table not partitioned by time, such a retention
workflow will generate delete files spread around all partitions. And
again, sometimes you cannot choose “positional deletes” for such a
retention workflow because this will extend the runtime of the retention
workflow. Having a long running maintenance workflow (such as retention, or
compaction) can be a bad idea if not disastrous, especially in a streaming,
low latency world.

With that being said, you can easily imagine what will happen next: a
retention workflow just finished, the query speed on this non time
partitioned table will suffer, until a compaction job kicks in later to
reform this table for better read performance.


On Tue, Jun 10, 2025 at 4:54 PM Xiaoxuan Li <xiaoxuan.li....@gmail.com>
wrote:

> Thank you for the thoughtful feedback, Yan, and for bringing up these
> important questions.
>
> > How realistic is the scenario I've described, and what's the likelihood
> of encountering it in production environments?
>
> I don’t have direct visibility into that either, but I’ve seen some
> vendors claim they can achieve sub-minute latency and write CDC streams to
> Iceberg with thousands of changes per minute. I’ll defer to others who may
> have more hands-on experience.
>
> > and in such situation users should be expected to firstly perform some
> other techniques such as compaction or other workload optimizations before
> considering adopting this index feature; but meanwhile I think we do want
> to make sure for well-maintained tables and valid/common use cases, the new
> proposal will not inadvertently creating limitations or bottlenecks.
>
> Yes, I totally agree. We definitely don’t want to introduce a new
> bottleneck while trying to solve an existing limitation. That’s one of the
> key reasons I’m proposing a file-level index approach. But as I’ve been
> thinking more about it, beyond caching index files on disk and storing
> footers in executor memory, we could also consider consolidated indexing at
> the manifest level. This would involve aggregating file-level indexes into
> the manifest, which could significantly improve efficiency, especially for
> handling many small files. We could apply a similar consolidation strategy
> to older partitions as well, potentially tying them to a separate
> manifest-level index file. Depending on the index size, we can decide
> whether to embed the index directly in the manifest or store it as a
> standalone file.
>
>
> Thanks Haizhou for your interest!
>
> > 1. Is this an Iceberg issue, or a Parquet (table format provider) issue?
>
> Like Steven mentioned, parquet is a file format. But your point is valid, 
> Parquet
> is optimized for sequential scan and lacks efficient random access
> capabilities. That’s why we need an index to support fast lookups. Each
> file format is designed with different workload patterns, and that’s how
> I’m thinking about this problem, by considering the strengths and
> limitations of each format for the use case. Hope that makes sense.
>
>
> > I think there is mention of the newly added index files requiring its
> own compaction workflow. A severe pain point today for many of our Flink
> based ingestion/DA/ETL users is that the compaction workflow takes longer
> than the commit interval - the highly frequent data change commit basically
> blocks the compaction commit from going into the table for an extremely
> long time (theoretically, could be forever).
>
> Thanks for pointing this out. If we're using file level indexing, the
> index files would be compacted alongside data files as part of the regular
> compaction workflow, so it shouldn't introduce additional compaction
> cycles. I’d love to learn more about your use cases, particularly how your
> table is partitioned, commit Interval, avg scale of your table, if you’re
> open to sharing. That context would help us understand how the indexing
> strategy might fit.
>
>
> Thanks,
>
> Xiaoxuan
>
> On Fri, Jun 6, 2025 at 7:34 PM Yan Yan <yyany...@gmail.com> wrote:
>
>> Thanks Xiaoxuan for the detailed proposal and everyone for the great
>> discussion!
>>
>> It seems to me that it feels more valuable if we can firstly clearly
>> define the specific use case we're trying to address, as this would help us
>> make more informed decisions about trade-offs between file vs partitioned
>> level indexing.
>>
>> Even if we already set up our goal to be leveraging indexing to replace
>> most equality-based deletes to positional deletes for CDC scenarios, I
>> think we need to consider the specific characteristics of both the table
>> and streaming workload. For instance, in a CDC environment where data
>> streams in quick and small batches, one extreme case I could imagine is a
>> table could accumulate hundreds of thousands of small files, with records
>> distributed randomly, and CDC dumps data in rapid succession leading to
>> fast increase in number of files.
>>
>> In such scenarios, meeting the requirement of "the table should be
>> partitioned and sorted by the PK" for efficient file-level indexing might
>> not easily be achievable; and the randomized data distribution and the
>> sheer volume of files (each with its own index) would require loading a big
>> number of index files into driver's memory to determine the correct delete
>> positions per data file for upserts. While thread pooling could help
>> optimize concurrent file reads and reduce S3 connection overhead, when
>> dealing with opening thousands of files or more, it would pose a great
>> challenge for the driver to operate well even when the concurrent reading
>> capabilities, preloading and caching mechanisms are in use.
>>
>> On the other hand, I could also see a complicated partitioned/global
>> index approach, if not designed or even configured well, could introduce
>> complications at write scenario even for straightforward pure insert
>> operations, potentially resulting in much more overhead than the file-level
>> index proposal aimed at optimizing CDC writes. Additional considerations
>> like multi-writer scenarios could further add complexity, as Xiaoxuan
>> previously mentioned (Peter/Steven, I think if you wouldn't mind sharing
>> more details about the proposal you mentioned earlier that would be great;
>> maybe we could have some offline discussion on this).
>>
>> My general questions would be:
>> 1. How realistic is the scenario I've described, and what's the
>> likelihood of encountering it in production environments? I personally
>> indeed do not have much visibility into this.
>> 2. Should this scenario be within the scope of our proposal? I could
>> totally understand the argument that a poorly maintained tables would
>> naturally incur performance penalties, and in such situation users should
>> be expected to firstly perform some other techniques such as compaction or
>> other workload optimizations before considering adopting this index
>> feature; but meanwhile I think we do want to make sure for well-maintained
>> tables and valid/common use cases, the new proposal will not inadvertently
>> creating limitations or bottlenecks.
>>
>> Thanks,
>> Yan
>>
>> On Wed, Jun 4, 2025 at 8:59 PM Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> Haizhou,
>>>
>>> 1. it is probably inaccurate to call Parquet a table format provider.
>>> Parquet is a just file format. Delete vectors (position deletes) are
>>> outside the scope of Parquet files. The nature of equality deletes just
>>> make it impossible to read in constant time O(1)
>>>
>>> 2. The inverted index idea is still in early discussions and not ready
>>> to be shared with the broader community. But to your question, it won't
>>> make compaction worse (slower). Totally understand the pain point you
>>> raised. It should be discussed and covered in whatever proposal.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Jun 4, 2025 at 5:52 PM Haizhou Zhao <zhaohaizhou940...@gmail.com>
>>> wrote:
>>>
>>>> Hey folks,
>>>>
>>>> Thanks for discussing this interesting topic. I have couple relevant
>>>> thoughts while reading through this thread:
>>>>
>>>> 1. Is this an Iceberg issue, or a Parquet (table format provider)
>>>> issue? For example, if Parquet (or other table format provider) provides a
>>>> mechanism where both query by position and query by equality in a data file
>>>> take constant time O(1), then would equality deletes still cause the same
>>>> kind of pain for Iceberg tables? For the Iceberg community, how do we
>>>> define the boundary of responsibility between the Iceberg project and other
>>>> table format providers like Parquet?
>>>>
>>>> 2. I think there is mention of the newly added index
>>>> files requiring its own compaction workflow. A severe pain point today for
>>>> many of our Flink based ingestion/DA/ETL users is that the compaction
>>>> workflow takes longer than the commit interval - the highly frequent data
>>>> change commit basically blocks the compaction commit from going into the
>>>> table for an extremely long time (theoretically, could be forever). I
>>>> understand our problem here is entirely different from the proposal in this
>>>> thread, but will the newly added index files push up the compaction
>>>> workflow runtime significantly, and thus making an existing issue worse?
>>>>
>>>> Thanks,
>>>> -Haizhou
>>>>
>>>> On Wed, Jun 4, 2025 at 3:21 PM Xiaoxuan Li <xiaoxuan.li....@gmail.com>
>>>> wrote:
>>>>
>>>>> Totally agree, supporting mutability on top of immutable storage at
>>>>> scale is a non-trivial problem.
>>>>> I think the number of index files is ok, we can preload them in
>>>>> parallel or cache them on disk. Not sure yet about caching deserialized
>>>>> data, that might need some more thought.
>>>>>
>>>>> Xiaoxuan
>>>>>
>>>>> On Wed, Jun 4, 2025 at 5:21 AM Péter Váry <peter.vary.apa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> > Our primary strategy for accelerating lookups focuses on optimizing
>>>>>> the index file itself, leveraging sorted keys, smaller row group sizes, 
>>>>>> and
>>>>>> Bloom filters for Parquet files. We’re also exploring custom formats that
>>>>>> support more fine-grained skipping.
>>>>>>
>>>>>> The techniques you mentioned are important, but in blob stores, the
>>>>>> number of files that need to be accessed often has a greater impact than
>>>>>> how each file is read.
>>>>>>
>>>>>> > > but if there are updates around your PK range, then you need to
>>>>>> read more index files.
>>>>>>
>>>>>> > Yes, as the table grows, the index files will scale accordingly.
>>>>>>
>>>>>> I wanted to point out the following issue: Even if the table size
>>>>>> remains constant and only updates occur, the number of files that need to
>>>>>> be accessed continues to grow.
>>>>>>
>>>>>>
>>>>>> > when updating many random keys, it's likely to touch nearly all
>>>>>> buckets, which increases the number of index files that must be scanned.
>>>>>> This is exactly why the lookup performance of the index file itself 
>>>>>> becomes
>>>>>> so critical.
>>>>>>
>>>>>> Once again, in blob storage systems, file access does not scale
>>>>>> efficiently. To address this, we need a strategy that enables result
>>>>>> caching. However, caching based on data files becomes ineffective when
>>>>>> there are frequent updates. In such cases, we must access every index 
>>>>>> file
>>>>>> added after the original file was created. While it's possible to filter
>>>>>> out some index files using statistics, this is largely unreliable due to
>>>>>> the inherently random nature of data ingestion.
>>>>>>
>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025.
>>>>>> jún. 4., Sze, 0:45):
>>>>>>
>>>>>>> Hi Peter,
>>>>>>>
>>>>>>> > If the table is partitioned and sorted by the PK, we don't really
>>>>>>> need to have any index. We can find the data file containing the record
>>>>>>> based on the Content File statistics, and the RowGroup containing the
>>>>>>> record based on the Parquet metadata.
>>>>>>>
>>>>>>> Our primary strategy for accelerating lookups focuses on optimizing
>>>>>>> the index file itself, leveraging sorted keys, smaller row group sizes, 
>>>>>>> and
>>>>>>> Bloom filters for Parquet files. We’re also exploring custom formats 
>>>>>>> that
>>>>>>> support more fine-grained skipping.
>>>>>>>
>>>>>>> But I agree, managing indexes is like managing a table. If we
>>>>>>> tightly coupled partitioning and file skipping to the base table, it 
>>>>>>> could
>>>>>>> limit flexibility and broader use cases. By having global indexes, we 
>>>>>>> can
>>>>>>> decouple those constraints and enable manifests level skipping by using 
>>>>>>> a
>>>>>>> different partition strategy for indexes. But that also leads us into a
>>>>>>> kind of circular dependency, do we want to treat the index as a table? 
>>>>>>> I'm
>>>>>>> happy to continue iterating on this idea.
>>>>>>>
>>>>>>> > but if there are updates around your PK range, then you need to
>>>>>>> read more index files.
>>>>>>>
>>>>>>> Yes, as the table grows, the index files will scale accordingly.
>>>>>>>
>>>>>>> > Do I understand correctly that your use case is updating a single
>>>>>>> record
>>>>>>>
>>>>>>> That was just an example, I'm planning to use a workload that
>>>>>>> accesses 0.01% of records from a 2-billion-row dataset as the 
>>>>>>> experimental
>>>>>>> baseline. But yes, when updating many random keys, it's likely to touch
>>>>>>> nearly all buckets, which increases the number of index files that must 
>>>>>>> be
>>>>>>> scanned. This is exactly why the lookup performance of the index file
>>>>>>> itself becomes so critical.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Xiaoxuan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 3, 2025 at 6:37 AM Péter Váry <
>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Xiaoxuan,
>>>>>>>>
>>>>>>>> > 2. File-Level Indexing
>>>>>>>> > [..]
>>>>>>>> > To make this efficient, the table should be partitioned and
>>>>>>>> sorted by the PK.
>>>>>>>>
>>>>>>>> If the table is partitioned and sorted by the PK, we don't really
>>>>>>>> need to have any index. We can find the data file containing the record
>>>>>>>> based on the Content File statistics, and the RowGroup containing the
>>>>>>>> record based on the Parquet metadata.
>>>>>>>> If the data is not fully sorted, then we will have multiple
>>>>>>>> index files to read. For example, if data is inserted and then 20 
>>>>>>>> updates
>>>>>>>> occur where the primary key falls within the range of the updated
>>>>>>>> records—but our specific key is not among those updated—we still need 
>>>>>>>> to
>>>>>>>> scan all files to find the current values for the record. There ways
>>>>>>>> around to filter out these files, but index file doesn't help here 
>>>>>>>> much.
>>>>>>>>
>>>>>>>> > Index access is naturally distributed as index files are
>>>>>>>> co-located with data files during scan.
>>>>>>>>
>>>>>>>> I don't get this. Sure, you can co-locate the index file with the
>>>>>>>> file you are reading, but if there are updates around your PK range, 
>>>>>>>> then
>>>>>>>> you need to read more index files.
>>>>>>>>
>>>>>>>> About the use-cases:
>>>>>>>>
>>>>>>>> Do I understand correctly that your use case is updating a single
>>>>>>>> record (like "UPDATE users SET discount='GOLD' WHERE user_id='GUID'"), 
>>>>>>>> and
>>>>>>>> not like a updating multiple records at once (like "UPDATE users SET
>>>>>>>> discount='GOLD' WHERE purchase_value > 1000")?
>>>>>>>>
>>>>>>>> IMHO, if we have updates for many records, caching is a must.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Peter
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 3, 2025, 04:43 Xiaoxuan Li <xiaoxuan.li....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Peter, for bringing these ideas forward! and you also
>>>>>>>>> raised a great point about clarifying the goal of indexing. I’ve been
>>>>>>>>> considering it with the intention of eventually enabling fast upserts
>>>>>>>>> through DVs. To support that, we need an index that maps primary keys 
>>>>>>>>> to
>>>>>>>>> both the data file and the key’s location within it.So the key lookup 
>>>>>>>>> speed
>>>>>>>>> is crucial here. This will allow us to quickly determine whether a key
>>>>>>>>> already exists and, if so, identify the location it resides in. In the
>>>>>>>>> event no data file contains the key, we can simply append new records.
>>>>>>>>>
>>>>>>>>> So there are two primary strategies for implementing indexing, and
>>>>>>>>> based on your description, you and Steven’s approach seems to fit more
>>>>>>>>> closely into the first one. Let me know if that’s accurate.
>>>>>>>>> 1. *Partitioned (Global) Indexing*
>>>>>>>>>
>>>>>>>>> This approach builds a *table-level global index* that can be
>>>>>>>>> partitioned independently of the base table’s partition scheme. 
>>>>>>>>> Instead of
>>>>>>>>> aligning with table partitions, the index can be bucketed using a hash
>>>>>>>>> function on primary keys, ensuring that each PK deterministically 
>>>>>>>>> maps to a
>>>>>>>>> specific index file.
>>>>>>>>>
>>>>>>>>> This model is well-suited for bulk index initial loading,
>>>>>>>>> providing consistent and efficient point lookups through deterministic
>>>>>>>>> key-to-file mapping. The primary challenge is maintaining index 
>>>>>>>>> freshness
>>>>>>>>> as new records arrive, requiring mechanisms to keep the index files
>>>>>>>>> synchronized with the underlying data. This approach is similar to 
>>>>>>>>> how Hudi
>>>>>>>>> manages its record index in that the number of partitions matches the
>>>>>>>>> number of index file groups(this is configurable to the user).
>>>>>>>>>
>>>>>>>>> *Pros:*
>>>>>>>>>
>>>>>>>>>    - *Flexible partitioning*: Index partitions are decoupled from
>>>>>>>>>    table partitions, allowing more control over lookup performance.
>>>>>>>>>    - Index files do not need to be rewritten during compaction
>>>>>>>>>    jobs.
>>>>>>>>>
>>>>>>>>> *Cons:*
>>>>>>>>>
>>>>>>>>>    - *Synchronous maintenance*: Index must be kept up to date
>>>>>>>>>    during each write, adding complexity. And with index files in each
>>>>>>>>>    partition accumulating, a compaction job for index files in each 
>>>>>>>>> partition
>>>>>>>>>    might be needed.
>>>>>>>>>    - *Distributed access*: I don’t yet see a clean way to read
>>>>>>>>>    and maintain the index in a distributed fashion across engines. 
>>>>>>>>> This aspect
>>>>>>>>>    needs some further design and brainstorming.
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>> 2. *File-Level Indexing*
>>>>>>>>>
>>>>>>>>> In this approach, *each data file has a corresponding index file*.
>>>>>>>>> When an upsert arrives, we rely on *table partitioning* and *file
>>>>>>>>> pruning* to reduce the number of index files we need to scan. To
>>>>>>>>> make this efficient, the table should be partitioned and sorted by 
>>>>>>>>> the PK.
>>>>>>>>>
>>>>>>>>> We can also further accelerate index file pruning using *Bloom
>>>>>>>>> filters*:
>>>>>>>>>
>>>>>>>>>    - Either as a *centralized Bloom index*, storing filters for
>>>>>>>>>    all files in one place. (table level bloom filter index?)
>>>>>>>>>    - Or embedded within *Parquet File* (if the index itself is
>>>>>>>>>    Parquet).
>>>>>>>>>
>>>>>>>>> *Pros:*
>>>>>>>>>
>>>>>>>>>    - No need to track index-data consistency. If an index file is
>>>>>>>>>    missing, we just scan the data file directly.
>>>>>>>>>    - Index access is naturally *distributed* as index files are
>>>>>>>>>    co-located with data files during scan.
>>>>>>>>>
>>>>>>>>> *Cons:*
>>>>>>>>>
>>>>>>>>>    - *Less flexible*: The index needs to use the same
>>>>>>>>>    partitioning strategy as the table.
>>>>>>>>>    - *Compaction overhead*: Whenever data files are compacted or
>>>>>>>>>    rewritten, the corresponding index files must also be updated.
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>>
>>>>>>>>> I am still leaning toward Option 2 for its simplicity. And, if
>>>>>>>>> combined with the Bloom filters, its performance is promising. While I
>>>>>>>>> agree that Option 1 is also worth considering because of the 
>>>>>>>>> flexibility it
>>>>>>>>> offers to partition skipping, I feel its uncertain performance gain 
>>>>>>>>> might
>>>>>>>>> not justify the implementation complexity added by it. Let me know 
>>>>>>>>> what you
>>>>>>>>> think about it.
>>>>>>>>> Additional Note:
>>>>>>>>>
>>>>>>>>> I think periodically rewriting indexes to match query patterns
>>>>>>>>> might not be a good idea, as it can be very costly for large datasets.
>>>>>>>>> Also, the index file format will play a crucial role in key lookup
>>>>>>>>> performance. For example, Hudi uses HFile for indexing, which 
>>>>>>>>> supports fast
>>>>>>>>> point lookups thanks to its sorted key-value layout, built-in 
>>>>>>>>> caching, and
>>>>>>>>> Bloom filters.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Xiaoxuan
>>>>>>>>>
>>>>>>>>> On Fri, May 30, 2025 at 4:25 AM Péter Váry <
>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>> I hope you had a good time on your time off!
>>>>>>>>>>
>>>>>>>>>> Thanks for your detailed response. I think it would help if we
>>>>>>>>>> focused on the specific use cases we want to support and what we 
>>>>>>>>>> ultimately
>>>>>>>>>> aim to achieve. By my understanding, there are a few distinct 
>>>>>>>>>> scenarios
>>>>>>>>>> we’ve been circling around:
>>>>>>>>>>
>>>>>>>>>>    1. Do we want writers to be able to easily write positional
>>>>>>>>>>    deletes instead of equality deletes?
>>>>>>>>>>    2. Do we want readers to easily convert equality deletes to
>>>>>>>>>>    positional deletes?
>>>>>>>>>>    3. Do we want readers to easily find records based on a
>>>>>>>>>>    primary key?
>>>>>>>>>>
>>>>>>>>>> Let me know if I’ve missed any important ones.
>>>>>>>>>>
>>>>>>>>>>  Personally, I see the first use case as the most critical, and
>>>>>>>>>> solving it makes the 2nd one obsolete. Please let me know about your
>>>>>>>>>> thoughts/preferences. It would help understand your point of view 
>>>>>>>>>> better.
>>>>>>>>>>
>>>>>>>>>> I also want to reiterate a point from my earlier comments:
>>>>>>>>>> > Notice that I used "index-partition". During the design we can
>>>>>>>>>> decide to use the table partitioning for index partitioning as well.
>>>>>>>>>> > That is why I was suggesting "index-partition"s. I don't have a
>>>>>>>>>> ready answer for this, but making sure that the index split fits 
>>>>>>>>>> into the
>>>>>>>>>> memory is important for Flink as well.
>>>>>>>>>>
>>>>>>>>>> So I agree with your point: for large partitions, directly
>>>>>>>>>> mapping them to "index-partition"s could be prohibitive.
>>>>>>>>>>
>>>>>>>>>> In discussions with Steven, we came up with a generic idea that
>>>>>>>>>> might help:
>>>>>>>>>>
>>>>>>>>>>    - Store index files as a new content file type in the
>>>>>>>>>>    manifest list, with column stats to support filtering during 
>>>>>>>>>> planning.
>>>>>>>>>>    - Periodically rewrite indexes to align with query patterns
>>>>>>>>>>    (e.g., bucketing by hashed PK).
>>>>>>>>>>    - We're only interested in unique indexes for now, so we can
>>>>>>>>>>    rely on snapshot sequence numbers instead of delete vectors.
>>>>>>>>>>    - The merging logic helps when writers produce index files
>>>>>>>>>>    with different granularities - we just read all relevant index 
>>>>>>>>>> files.
>>>>>>>>>>
>>>>>>>>>> That said, organizing both the index files and the records within
>>>>>>>>>> them to best support our query patterns remains key. That’s why
>>>>>>>>>> understanding the actual use cases is so important.
>>>>>>>>>>
>>>>>>>>>> For the Flink ingestion use case specifically, our best idea for
>>>>>>>>>> organizing the index is as follows:
>>>>>>>>>>
>>>>>>>>>>    - Generate ranges based on the primary key (PK) hash values.
>>>>>>>>>>    - Order records within each index file by PK hash.
>>>>>>>>>>
>>>>>>>>>> This structure allows us to locate a record with a given PK by
>>>>>>>>>> reading just one index file and accessing a single row group within 
>>>>>>>>>> the
>>>>>>>>>> corresponding index file. It’s a lightweight and performant approach 
>>>>>>>>>> that
>>>>>>>>>> aligns well with Flink’s streaming characteristics.
>>>>>>>>>>
>>>>>>>>>> Looking forward to your thoughts!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Peter
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025.
>>>>>>>>>> máj. 29., Cs, 18:16):
>>>>>>>>>>
>>>>>>>>>>> Hi Peter, thanks for sharing the context around the Flink
>>>>>>>>>>> streaming use case and side note for concurrent write. Apologies 
>>>>>>>>>>> for the
>>>>>>>>>>> delay as I just got back from a vacation. Yeah, I agree, having the 
>>>>>>>>>>> index
>>>>>>>>>>> at the partition level is a better approach if we plan to use 
>>>>>>>>>>> caching. As a
>>>>>>>>>>> distributed cache would introduce additional system overhead, and
>>>>>>>>>>> maintaining separate caches on each node could lead to redundant 
>>>>>>>>>>> storage
>>>>>>>>>>> footprints.
>>>>>>>>>>>
>>>>>>>>>>> But having one index file for the entire partition is not
>>>>>>>>>>> feasible because paying the cost of rewriting the index file for an 
>>>>>>>>>>> entire
>>>>>>>>>>> partition on every write operation is expensive and could introduce
>>>>>>>>>>> significant write latency. If we want to use indexes at planning 
>>>>>>>>>>> time, we
>>>>>>>>>>> would need a mapping from primary keys to data files and row 
>>>>>>>>>>> positions for
>>>>>>>>>>> the whole partition. This can be constructed by aggregating 
>>>>>>>>>>> file-level
>>>>>>>>>>> indexes at the planner. Even though each index file corresponds to 
>>>>>>>>>>> a single
>>>>>>>>>>> data file, we can use a thread pool to load them in parallel. 
>>>>>>>>>>> Compared to
>>>>>>>>>>> loading one large index file, the performance could be similar. 
>>>>>>>>>>> Since the
>>>>>>>>>>> index files are already cached in memory or on disk, index files 
>>>>>>>>>>> loading
>>>>>>>>>>> time becomes negligible. In fact, having one index file per data 
>>>>>>>>>>> file might
>>>>>>>>>>> be advantageous, as it allows for incremental loading.
>>>>>>>>>>>
>>>>>>>>>>> One thing about file-level indexing is that planner or task node
>>>>>>>>>>> is not required to scan all index files upfront. Partition pruning 
>>>>>>>>>>> and file
>>>>>>>>>>> filtering should occur prior to index access, allowing us to load 
>>>>>>>>>>> only the
>>>>>>>>>>> relevant index files associated with the pruned data set.
>>>>>>>>>>>
>>>>>>>>>>> Another concern I have with the caching approach is determining
>>>>>>>>>>> which partitions to cache, since caching indexes for the entire 
>>>>>>>>>>> table isn't
>>>>>>>>>>> practical. For time-based partitions, this might be more 
>>>>>>>>>>> straightforward,
>>>>>>>>>>> for example, caching the most recent partitions. However, that's 
>>>>>>>>>>> not always
>>>>>>>>>>> applicable for all use cases.
>>>>>>>>>>>
>>>>>>>>>>> If after prototyping, we find the solution isn’t performant
>>>>>>>>>>> enough for the streaming use case and we still want it to be 
>>>>>>>>>>> handled, we
>>>>>>>>>>> could explore a hybrid approach of Option 2 and Option 3 that Anton
>>>>>>>>>>> mentioned.
>>>>>>>>>>>
>>>>>>>>>>> Also, thanks to Ismail for highlighting the BigQuery approach,
>>>>>>>>>>> that's helpful context!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 14, 2025 at 3:39 AM ismail simsek <
>>>>>>>>>>> ismailxsim...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All, Thank you for working on this.
>>>>>>>>>>>>
>>>>>>>>>>>> I wanted to share a reference to the BigQuery implementation 
>>>>>>>>>>>> <https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness>
>>>>>>>>>>>>  (Option 3) as another potential approach, and for inspiration.
>>>>>>>>>>>> In this setup, The engine is running periodic merge jobs and 
>>>>>>>>>>>> applying equality deletes to the actual table, based on PK. and 
>>>>>>>>>>>> for some cases applying it during runtime.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards
>>>>>>>>>>>> ismail
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 14, 2025 at 7:37 AM Péter Váry <
>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me describe, how the Flink streaming writer uses equality
>>>>>>>>>>>>> deletes, and how it could use indexes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> When the Flink streaming writer receives a new insert, then it
>>>>>>>>>>>>> appends the data to a data file. When it receives a delete, it 
>>>>>>>>>>>>> appends the
>>>>>>>>>>>>> primary key to an equality delete file. When it receives an 
>>>>>>>>>>>>> update the it
>>>>>>>>>>>>> creates both a data and a delete record.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If we have an index available, the writer could rely on the
>>>>>>>>>>>>> index to get the position of the deleted record, and write a 
>>>>>>>>>>>>> position
>>>>>>>>>>>>> delete instead of an equality delete.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This index lookup is only effective, if we can keep the index
>>>>>>>>>>>>> in memory, and effectively distribute the incoming records 
>>>>>>>>>>>>> between the
>>>>>>>>>>>>> writers so the index cache is used. In this case, the lookup cost 
>>>>>>>>>>>>> becomes
>>>>>>>>>>>>> O(1) if we consider only the file access cost. If we need to read 
>>>>>>>>>>>>> an index
>>>>>>>>>>>>> file for every data file we are adding O(n) delay on record 
>>>>>>>>>>>>> level, where
>>>>>>>>>>>>> the n is the number of the data files in the table.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I agree with you that the cost of rewriting the index file is
>>>>>>>>>>>>> not trivial, but that happens on commit level. This is much 
>>>>>>>>>>>>> better than
>>>>>>>>>>>>> having an overhead on record level.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > As a result, we’d likely need to split and read the index
>>>>>>>>>>>>> file in distributed planning mode
>>>>>>>>>>>>>
>>>>>>>>>>>>> That is why I was suggesting "index-partition"s. I don't have
>>>>>>>>>>>>> a ready answer for this, but making sure that the index split 
>>>>>>>>>>>>> fits into the
>>>>>>>>>>>>> memory is important for Flink as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > However the drawback of partition-level index is that the
>>>>>>>>>>>>> index must always be kept up to date to remain useful.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't see how we can avoid that in case of Flink writers.
>>>>>>>>>>>>> The alternative is to read the non-indexed files in every writer, 
>>>>>>>>>>>>> which
>>>>>>>>>>>>> seems like a no-go for me.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > the value of an inverted index is not column pruning. It’s
>>>>>>>>>>>>> how it enables fast point lookups
>>>>>>>>>>>>>
>>>>>>>>>>>>> In my experience, the biggest gains are realized during
>>>>>>>>>>>>> planning, when the planner prunes whole files. While there is a 
>>>>>>>>>>>>> possibility
>>>>>>>>>>>>> for a distributed planner to read fewer number of index files, I 
>>>>>>>>>>>>> don't
>>>>>>>>>>>>> think it is possible for a planner to read the index files if 
>>>>>>>>>>>>> they are
>>>>>>>>>>>>> stored for every data file. (Unless we are talking about a 
>>>>>>>>>>>>> catalog which
>>>>>>>>>>>>> merges/caches the relevant part of the indexes. Sadly this is 
>>>>>>>>>>>>> something
>>>>>>>>>>>>> which is available for the Flink writers)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Side note: The index solution still prevents (or makes
>>>>>>>>>>>>> complicated) to handle concurrent writes to the table. Currently, 
>>>>>>>>>>>>> if a
>>>>>>>>>>>>> concurrent writer updated the record, Flink just updated it 
>>>>>>>>>>>>> again, and only
>>>>>>>>>>>>> a single record remains with the doubly updated primary key. With 
>>>>>>>>>>>>> the index
>>>>>>>>>>>>> based solution, we might end up with duplicated keys. This might 
>>>>>>>>>>>>> be an
>>>>>>>>>>>>> acceptable tradeoff, but we should be aware of it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for working on this Xiaoxuan!
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 14, 2025, 05:25 Xiaoxuan Li <
>>>>>>>>>>>>> xiaoxuan.li....@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Peter,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the detailed illustration. I understand your
>>>>>>>>>>>>>> concern.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I believe the core question here is whether the index is used
>>>>>>>>>>>>>> during job planning or at the scan task. This depends on how 
>>>>>>>>>>>>>> index files
>>>>>>>>>>>>>> are referenced, at the file level or partition level.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In my view, both approaches ultimately serve the same
>>>>>>>>>>>>>> purpose. The main difference lies in how the index files are 
>>>>>>>>>>>>>> generated and
>>>>>>>>>>>>>> split.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For a partition-level index, would we generate a single index
>>>>>>>>>>>>>> file per partition? If so, each update to the partition would 
>>>>>>>>>>>>>> require
>>>>>>>>>>>>>> either rewriting a whole new index file, which could be costly 
>>>>>>>>>>>>>> at write
>>>>>>>>>>>>>> time given that the index size grows along with data size, or 
>>>>>>>>>>>>>> appending a
>>>>>>>>>>>>>> new index file per write operation, which would functionally be 
>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>> a file-level index.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And due to the potentially large size of index files, even
>>>>>>>>>>>>>> one index file per partition may not support efficient planning 
>>>>>>>>>>>>>> in local
>>>>>>>>>>>>>> planning mode. As a result, we’d likely need to split and read 
>>>>>>>>>>>>>> the index
>>>>>>>>>>>>>> file in distributed planning mode, making it functionally 
>>>>>>>>>>>>>> equivalent to
>>>>>>>>>>>>>> reading multiple index files at the task level.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However the drawback of partition-level index is that the
>>>>>>>>>>>>>> index must always be kept up to date to remain useful.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, Steven, in my opinion, the value of an inverted index
>>>>>>>>>>>>>> is not column pruning. It’s how it enables fast point lookups. 
>>>>>>>>>>>>>> As mentioned
>>>>>>>>>>>>>> earlier, we just need to ensure that index lookups are faster 
>>>>>>>>>>>>>> than file
>>>>>>>>>>>>>> scan with delete predicate evaluation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know whether this addresses any of the concerns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, May 12, 2025 at 4:34 PM Steven Wu <
>>>>>>>>>>>>>> stevenz...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> agree with Peter that 1:1 mapping of data files and inverted
>>>>>>>>>>>>>>> indexes are not as useful. With columnar format like Parquet, 
>>>>>>>>>>>>>>> this can also
>>>>>>>>>>>>>>> be achieved equivalently by reading the data file with 
>>>>>>>>>>>>>>> projection on the
>>>>>>>>>>>>>>> identifier columns.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, May 12, 2025 at 4:20 AM Péter Váry <
>>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Do we plan to store the indexes in a separate file
>>>>>>>>>>>>>>>> alongside the data files? If so, then I have the following 
>>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>> - I agree that the 1-on-1 mapping of data files and index
>>>>>>>>>>>>>>>> files is easy to maintain OTOH it is less useful as an index.
>>>>>>>>>>>>>>>> - The writer (which is looking for a column with a specific
>>>>>>>>>>>>>>>> primary key) needs to open all of the index files until it 
>>>>>>>>>>>>>>>> finds the given
>>>>>>>>>>>>>>>> key. Since the index files are typically small, the cost here 
>>>>>>>>>>>>>>>> is O(n) where
>>>>>>>>>>>>>>>> n is the number of the index files (equal to the number of the 
>>>>>>>>>>>>>>>> data files).
>>>>>>>>>>>>>>>> - If we add a Bloom filter on the primary key to the data
>>>>>>>>>>>>>>>> files, then the writer reads the footer of every data file and 
>>>>>>>>>>>>>>>> reads only
>>>>>>>>>>>>>>>> the file which contains the primary key. This is also O(n) 
>>>>>>>>>>>>>>>> where the n is
>>>>>>>>>>>>>>>> the number of the data files.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So, IMHO having a 1-on-1 mapping between the data files and
>>>>>>>>>>>>>>>> the index files is not too beneficial.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> OTOH if we maintain the index files on "index-partition"
>>>>>>>>>>>>>>>> level, then the cost becomes O(p) where the p is the number of
>>>>>>>>>>>>>>>> "index-partitions" which could be significantly lower. Notice 
>>>>>>>>>>>>>>>> that I used
>>>>>>>>>>>>>>>> "index-partition". During the design we can decide to use the 
>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>> partitioning for index partitioning as well. This requires us 
>>>>>>>>>>>>>>>> to use easily
>>>>>>>>>>>>>>>> maintainable indexes.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So while maintaining the index is more costly if it
>>>>>>>>>>>>>>>> contains data from multiple data files, it also becomes much 
>>>>>>>>>>>>>>>> more useful.
>>>>>>>>>>>>>>>> Maintenance procedures could build upon the data file 
>>>>>>>>>>>>>>>> immutability and
>>>>>>>>>>>>>>>> simplify the index maintenance.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Xiaoxuan for the good conversation!
>>>>>>>>>>>>>>>> Peter
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont:
>>>>>>>>>>>>>>>> 2025. máj. 10., Szo, 9:13):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks Anton for the context and summary of the options,
>>>>>>>>>>>>>>>>> great to hear that this direction aligns with earlier 
>>>>>>>>>>>>>>>>> community
>>>>>>>>>>>>>>>>> discussions. And thanks Gyula and Peter for the clear 
>>>>>>>>>>>>>>>>> analysis. I agree
>>>>>>>>>>>>>>>>> with both of you, the index needs to be designed and 
>>>>>>>>>>>>>>>>> implemented
>>>>>>>>>>>>>>>>> efficiently in order to scale for large data sets and 
>>>>>>>>>>>>>>>>> streaming use cases.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I wanted to share some context around why I proposed a
>>>>>>>>>>>>>>>>> file-level index.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The benefit of a file-level index is that it naturally
>>>>>>>>>>>>>>>>> supports partition pruning, file skipping and time travel. 
>>>>>>>>>>>>>>>>> Issues like
>>>>>>>>>>>>>>>>> index invalidation are inherently handled because the index 
>>>>>>>>>>>>>>>>> is tied to the
>>>>>>>>>>>>>>>>> data file’s lifecycle. Even if the index isn't always 
>>>>>>>>>>>>>>>>> perfectly in sync,
>>>>>>>>>>>>>>>>> it's still usable, and since its lifecycle is bound to the 
>>>>>>>>>>>>>>>>> file, meaning
>>>>>>>>>>>>>>>>> lifecycle management only needs to track with the file 
>>>>>>>>>>>>>>>>> itself, reducing
>>>>>>>>>>>>>>>>> complexity in index management and maintenance.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On the other hand, partition/table level indexes must
>>>>>>>>>>>>>>>>> explicitly manage synchronization with the table, handling
>>>>>>>>>>>>>>>>> updates/snapshots on the writer side, and partition pruning, 
>>>>>>>>>>>>>>>>> file skipping,
>>>>>>>>>>>>>>>>> time travel, and index invalidation on the reader side. Since 
>>>>>>>>>>>>>>>>> index files
>>>>>>>>>>>>>>>>> are immutable and grow alongside data files, maintaining them 
>>>>>>>>>>>>>>>>> can become as
>>>>>>>>>>>>>>>>> complex as managing the entire table. It’s not surprising 
>>>>>>>>>>>>>>>>> that Hudi uses a
>>>>>>>>>>>>>>>>> metadata table(which is essentially a Hudi MOR table) to 
>>>>>>>>>>>>>>>>> manage its index.
>>>>>>>>>>>>>>>>> Personally, I find this approach less appealing, as it 
>>>>>>>>>>>>>>>>> introduces a
>>>>>>>>>>>>>>>>> circular dependency that adds architectural complexity.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For the positional delete case, even if index lookups
>>>>>>>>>>>>>>>>> could be faster than scanning files with predicates, it's 
>>>>>>>>>>>>>>>>> still unclear
>>>>>>>>>>>>>>>>> whether that alone is sufficient for streaming workloads, 
>>>>>>>>>>>>>>>>> especially when
>>>>>>>>>>>>>>>>> many files are involved. I think Peter’s idea of maintaining 
>>>>>>>>>>>>>>>>> a hot cache of
>>>>>>>>>>>>>>>>> the index within the streaming engine is promising. 
>>>>>>>>>>>>>>>>> Alternatively, using an
>>>>>>>>>>>>>>>>> external key-value store for fast lookups could also be 
>>>>>>>>>>>>>>>>> explored. Would be
>>>>>>>>>>>>>>>>> great to hear others’ thoughts on this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 8:12 AM Péter Váry <
>>>>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When going through the options mentioned by Anton, I feel
>>>>>>>>>>>>>>>>>> that Option 1 and 4 are just pushing the responsibility of 
>>>>>>>>>>>>>>>>>> converting the
>>>>>>>>>>>>>>>>>> equality deletes to positional deletes to the engine side. 
>>>>>>>>>>>>>>>>>> The only
>>>>>>>>>>>>>>>>>> difference is whether the conversion happens on the write 
>>>>>>>>>>>>>>>>>> side or on the
>>>>>>>>>>>>>>>>>> read side. This is a step back, and doesn't help solving the 
>>>>>>>>>>>>>>>>>> problem for
>>>>>>>>>>>>>>>>>> streaming engines.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Option 2 needs a bit of thought. We need to find a good
>>>>>>>>>>>>>>>>>> indexing strategy which allows big tables and plays nice 
>>>>>>>>>>>>>>>>>> with streaming
>>>>>>>>>>>>>>>>>> writes too. It would be good to allow the engines to cache a 
>>>>>>>>>>>>>>>>>> limited part
>>>>>>>>>>>>>>>>>> of the index, and distribute records to the writers in a way 
>>>>>>>>>>>>>>>>>> that the cache
>>>>>>>>>>>>>>>>>> locality is considered. Also creating an index file for 
>>>>>>>>>>>>>>>>>> every data file
>>>>>>>>>>>>>>>>>> might be suboptimal, as reading that many index files could 
>>>>>>>>>>>>>>>>>> tank
>>>>>>>>>>>>>>>>>> performance.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have yet to see a good idea for Option 3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Another option could be to provide a way to convert the
>>>>>>>>>>>>>>>>>> equality deletes to positional deletes as soon as possible. 
>>>>>>>>>>>>>>>>>> Maybe in
>>>>>>>>>>>>>>>>>> frequent compaction tasks, or at first read? When we first 
>>>>>>>>>>>>>>>>>> apply an
>>>>>>>>>>>>>>>>>> equality delete for a file, it is very easy to calculate the 
>>>>>>>>>>>>>>>>>> equivalent DV
>>>>>>>>>>>>>>>>>> for the equality delete. Subsequent readers could depend on 
>>>>>>>>>>>>>>>>>> the DV instead
>>>>>>>>>>>>>>>>>> of the equality delete. This could be the least disruptive 
>>>>>>>>>>>>>>>>>> change, but I
>>>>>>>>>>>>>>>>>> see a few issues with this solution as well:
>>>>>>>>>>>>>>>>>> - First read still depends on the equality delete which
>>>>>>>>>>>>>>>>>> in edge cases could be very costly
>>>>>>>>>>>>>>>>>> - Filters don't play well with this method
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> All-in-all, I like the index solution (Option 2) for
>>>>>>>>>>>>>>>>>> several reasons:
>>>>>>>>>>>>>>>>>> - Cuts out the need for equality deletes which would
>>>>>>>>>>>>>>>>>> reduce the code quite a bit
>>>>>>>>>>>>>>>>>> - While slows down the write path a bit, the overall
>>>>>>>>>>>>>>>>>> workflow (write+read) benefits from it
>>>>>>>>>>>>>>>>>> - The index could be reused for other use-cases as well
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Peter
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont:
>>>>>>>>>>>>>>>>>> 2025. máj. 9., P, 12:16):
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Anton,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thank you for summarizing the options we see at this
>>>>>>>>>>>>>>>>>>> stage in a structured and concise way.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Based on the use-cases I see in the industry, I feel
>>>>>>>>>>>>>>>>>>> that not all of the highlighted options are feasible (or 
>>>>>>>>>>>>>>>>>>> desirable).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Option 4 would basically remove any possibilities for
>>>>>>>>>>>>>>>>>>> native streaming CDC reads on tables, severely limiting how 
>>>>>>>>>>>>>>>>>>> Iceberg can be
>>>>>>>>>>>>>>>>>>> used in the future for real-time use-cases from Flink and 
>>>>>>>>>>>>>>>>>>> other similar
>>>>>>>>>>>>>>>>>>> engines that may want to connect. I understand that the 
>>>>>>>>>>>>>>>>>>> view reconciliation
>>>>>>>>>>>>>>>>>>> approach is implemented in Spark SQL already but 
>>>>>>>>>>>>>>>>>>> implementing it in a
>>>>>>>>>>>>>>>>>>> proper streaming way would likely lead to similar problems 
>>>>>>>>>>>>>>>>>>> that we are
>>>>>>>>>>>>>>>>>>> trying to solve here in the first place.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding Option 1, introducing engine specific/custom
>>>>>>>>>>>>>>>>>>> indexing solutions would go against the core design 
>>>>>>>>>>>>>>>>>>> principles as it would
>>>>>>>>>>>>>>>>>>> be hard to mix different engines when writing / reading 
>>>>>>>>>>>>>>>>>>> tables for CDC
>>>>>>>>>>>>>>>>>>> use-cases. (A streaming job would have a hard time writing 
>>>>>>>>>>>>>>>>>>> upserts/equality
>>>>>>>>>>>>>>>>>>> deletes to tables that were written by a different engine). 
>>>>>>>>>>>>>>>>>>> To
>>>>>>>>>>>>>>>>>>> me this sounds very similar to Option 4 in a way that it 
>>>>>>>>>>>>>>>>>>> pushes too much
>>>>>>>>>>>>>>>>>>> logic to the engines in a way that would hurt the 
>>>>>>>>>>>>>>>>>>> compatibility across the
>>>>>>>>>>>>>>>>>>> engines.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Based on this and the context of the discussion, Option
>>>>>>>>>>>>>>>>>>> 2 or a combination of Option 2 & 3 sounds the most 
>>>>>>>>>>>>>>>>>>> reasonable to me. There
>>>>>>>>>>>>>>>>>>> are still a lot of questions on the practical 
>>>>>>>>>>>>>>>>>>> implementation of the indexes
>>>>>>>>>>>>>>>>>>> and how we can do this efficiently so this is only a very 
>>>>>>>>>>>>>>>>>>> early feedback
>>>>>>>>>>>>>>>>>>> from my end.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 12:14 AM Anton Okolnychyi <
>>>>>>>>>>>>>>>>>>> aokolnyc...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am glad to see that folks are thinking about this
>>>>>>>>>>>>>>>>>>>> problem. I am looking forward to a formal proposal/design 
>>>>>>>>>>>>>>>>>>>> doc to discuss
>>>>>>>>>>>>>>>>>>>> details!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Overall, this aligns with what we discussed in the
>>>>>>>>>>>>>>>>>>>> community earlier w.r.t. the future of equality deletes 
>>>>>>>>>>>>>>>>>>>> and streaming
>>>>>>>>>>>>>>>>>>>> upserts. If I were to summarize, we have the following 
>>>>>>>>>>>>>>>>>>>> options:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Option 1: Add an inverted index (potentially
>>>>>>>>>>>>>>>>>>>> distributed) maintained by an engine that does streaming 
>>>>>>>>>>>>>>>>>>>> writes to always
>>>>>>>>>>>>>>>>>>>> produce DVs, even in streaming use cases. Deprecate/remove 
>>>>>>>>>>>>>>>>>>>> equality deletes
>>>>>>>>>>>>>>>>>>>> from Iceberg.
>>>>>>>>>>>>>>>>>>>> Option 2: Add native indexing to Iceberg so that the
>>>>>>>>>>>>>>>>>>>> lookup of positions is quick and efficient enough to be 
>>>>>>>>>>>>>>>>>>>> used in streaming
>>>>>>>>>>>>>>>>>>>> upserts. Deprecate/remove equality deletes from Iceberg.
>>>>>>>>>>>>>>>>>>>> Option 3: Rethink equality deletes, potentially by
>>>>>>>>>>>>>>>>>>>> introducing more restrictions and trying to scope them to 
>>>>>>>>>>>>>>>>>>>> particular data
>>>>>>>>>>>>>>>>>>>> files, similar to DVs.
>>>>>>>>>>>>>>>>>>>> Option 4: Standardize on a view reconciliation approach
>>>>>>>>>>>>>>>>>>>> that Tabular implemented for CDC.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would like to highlight that what Spark does today
>>>>>>>>>>>>>>>>>>>> during MERGE is similar to a lookup in an inverted index 
>>>>>>>>>>>>>>>>>>>> represented by
>>>>>>>>>>>>>>>>>>>> another Iceberg table. That is OK for batch jobs but not 
>>>>>>>>>>>>>>>>>>>> enough for
>>>>>>>>>>>>>>>>>>>> streaming.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - Anton
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> чт, 8 трав. 2025 р. о 10:08 Xiaoxuan Li <
>>>>>>>>>>>>>>>>>>>> xiaoxuan.li....@gmail.com> пише:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Zheng, Steven, Amogh and Gyula. Thank you all for
>>>>>>>>>>>>>>>>>>>>> the feedback!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I agree with everyone, we need to narrow down the
>>>>>>>>>>>>>>>>>>>>> scope of this optimization. The primary issue I'm trying 
>>>>>>>>>>>>>>>>>>>>> to address is the
>>>>>>>>>>>>>>>>>>>>> slow read performance caused by the growing number of 
>>>>>>>>>>>>>>>>>>>>> equality delete
>>>>>>>>>>>>>>>>>>>>> files(streaming CDC scenarios). The other potential use 
>>>>>>>>>>>>>>>>>>>>> cases are only
>>>>>>>>>>>>>>>>>>>>> mentioned to show the extensibility of this approach.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> And both equality and positional deletes suffer from
>>>>>>>>>>>>>>>>>>>>> the same core problem, records are evaluated repeatedly 
>>>>>>>>>>>>>>>>>>>>> against multiple
>>>>>>>>>>>>>>>>>>>>> delete predicates, at read time for equality deletes, and 
>>>>>>>>>>>>>>>>>>>>> at write time for
>>>>>>>>>>>>>>>>>>>>> positional deletes. This repeated evaluation is where the 
>>>>>>>>>>>>>>>>>>>>> real bottleneck
>>>>>>>>>>>>>>>>>>>>> lies.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It’s particularly bad for equality deletes, since we
>>>>>>>>>>>>>>>>>>>>> constantly recompute row positions during reads without 
>>>>>>>>>>>>>>>>>>>>> ever materializing
>>>>>>>>>>>>>>>>>>>>> them. Eventually, a maintenance job is required just to 
>>>>>>>>>>>>>>>>>>>>> rewrite them into
>>>>>>>>>>>>>>>>>>>>> positional deletes.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The idea is to eliminate this overhead by introducing
>>>>>>>>>>>>>>>>>>>>> an inverted index that maps values (or value 
>>>>>>>>>>>>>>>>>>>>> combinations) directly to row
>>>>>>>>>>>>>>>>>>>>> positions. This lets us skip full predicate evaluation 
>>>>>>>>>>>>>>>>>>>>> and jump straight to
>>>>>>>>>>>>>>>>>>>>> the affected rows, similar to how Hudi uses a 
>>>>>>>>>>>>>>>>>>>>> record-level index for fast
>>>>>>>>>>>>>>>>>>>>> upserts. If we can fetch row positions from the index, we 
>>>>>>>>>>>>>>>>>>>>> can greatly
>>>>>>>>>>>>>>>>>>>>> reduce overhead during reads (for equality deletes) and 
>>>>>>>>>>>>>>>>>>>>> writes (for
>>>>>>>>>>>>>>>>>>>>> positional deletes).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In fact, if we can make positional deletes perform as
>>>>>>>>>>>>>>>>>>>>> well as equality deletes using this index, we might be 
>>>>>>>>>>>>>>>>>>>>> able to get rid of
>>>>>>>>>>>>>>>>>>>>> equality deletes, but that needs to be evaluated for the 
>>>>>>>>>>>>>>>>>>>>> upsert case.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> That’s part of the reason why other types of indexes
>>>>>>>>>>>>>>>>>>>>> came up, they’re applicable beyond just primary key 
>>>>>>>>>>>>>>>>>>>>> columns, but CDC is the
>>>>>>>>>>>>>>>>>>>>> only scenario with stricter SLA requirements. So it makes 
>>>>>>>>>>>>>>>>>>>>> sense to align on
>>>>>>>>>>>>>>>>>>>>> the exact problem and use cases. For now, I think we can 
>>>>>>>>>>>>>>>>>>>>> define our main
>>>>>>>>>>>>>>>>>>>>> goal as supporting inverted indexing over primary key 
>>>>>>>>>>>>>>>>>>>>> columns to address
>>>>>>>>>>>>>>>>>>>>> the slowness of reading caused by the growing number of 
>>>>>>>>>>>>>>>>>>>>> equality delete
>>>>>>>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks, Amogh, for bringing up the comparison with
>>>>>>>>>>>>>>>>>>>>> known alternatives. We should include benchmarks for 
>>>>>>>>>>>>>>>>>>>>> those as well, to
>>>>>>>>>>>>>>>>>>>>> illustrate the trade-offs in read/write performance, 
>>>>>>>>>>>>>>>>>>>>> storage usage, and
>>>>>>>>>>>>>>>>>>>>> overall cost.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Really appreciate your feedback! I’ll incorporate
>>>>>>>>>>>>>>>>>>>>> these into the next revision of the proposal.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 1:25 AM Gyula Fóra <
>>>>>>>>>>>>>>>>>>>>> gyula.f...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thank you for the proposal!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I agree with what had been said above that we need to
>>>>>>>>>>>>>>>>>>>>>> narrow down the scope here and what is the primary 
>>>>>>>>>>>>>>>>>>>>>> target for the
>>>>>>>>>>>>>>>>>>>>>> optimization.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> As Amogh has also pointed out, CDC (streaming) read
>>>>>>>>>>>>>>>>>>>>>> performance (with equality deletes) would be one of the 
>>>>>>>>>>>>>>>>>>>>>> biggest
>>>>>>>>>>>>>>>>>>>>>> beneficiaries of this at a first glance.
>>>>>>>>>>>>>>>>>>>>>> This is especially important for Flink users where
>>>>>>>>>>>>>>>>>>>>>> this feature is currently completely missing and there 
>>>>>>>>>>>>>>>>>>>>>> is a big demand for
>>>>>>>>>>>>>>>>>>>>>> it as we rely on equality deletes on the write path. [1]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I am not aware of alternative proposals that would
>>>>>>>>>>>>>>>>>>>>>> solve the equality delete cdc read performance question, 
>>>>>>>>>>>>>>>>>>>>>> overall I think
>>>>>>>>>>>>>>>>>>>>>> using indices is reasonable and a very promising 
>>>>>>>>>>>>>>>>>>>>>> approach.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Looking forward to more details and discussion!
>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/njmxjmjjm341fp4mgynn483v15mhk7qd
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 9:24 AM Amogh Jahagirdar <
>>>>>>>>>>>>>>>>>>>>>> 2am...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thank you for the proposal Xiaoxuan! I think I agree
>>>>>>>>>>>>>>>>>>>>>>> with Zheng and Steven's point that it'll probably be 
>>>>>>>>>>>>>>>>>>>>>>> more helpful to start
>>>>>>>>>>>>>>>>>>>>>>> out with more specific "what" and "why" (known areas of 
>>>>>>>>>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>>>>>>>>>> Iceberg and driven by any use cases) before we get too 
>>>>>>>>>>>>>>>>>>>>>>> deep into the "how".
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> In my mind, the specific known area of improvement
>>>>>>>>>>>>>>>>>>>>>>> for Iceberg related to this proposal is improving 
>>>>>>>>>>>>>>>>>>>>>>> streaming upsert
>>>>>>>>>>>>>>>>>>>>>>> behavior. One area this improvement is beneficial for 
>>>>>>>>>>>>>>>>>>>>>>> is being able to
>>>>>>>>>>>>>>>>>>>>>>> provide better data freshness for Iceberg CDC mirror 
>>>>>>>>>>>>>>>>>>>>>>> tables without the
>>>>>>>>>>>>>>>>>>>>>>> heavy read + maintenance cost that currently exist with 
>>>>>>>>>>>>>>>>>>>>>>> Flink upserts.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> As you mentioned, equality deletes have the benefit
>>>>>>>>>>>>>>>>>>>>>>> of being very cheap to write but can come at a high and 
>>>>>>>>>>>>>>>>>>>>>>> unpredictable cost
>>>>>>>>>>>>>>>>>>>>>>> at read time. Challenges with equality deletes have 
>>>>>>>>>>>>>>>>>>>>>>> been discussed in the
>>>>>>>>>>>>>>>>>>>>>>> past [1].
>>>>>>>>>>>>>>>>>>>>>>> I'll also add that if one of the goals is to
>>>>>>>>>>>>>>>>>>>>>>> improving streaming upserts (e.g. for applying CDC 
>>>>>>>>>>>>>>>>>>>>>>> change streams into
>>>>>>>>>>>>>>>>>>>>>>> Iceberg mirror tables), then there are alternatives 
>>>>>>>>>>>>>>>>>>>>>>> that I think we should
>>>>>>>>>>>>>>>>>>>>>>> compare against to make
>>>>>>>>>>>>>>>>>>>>>>> the tradeoffs clear. These alternatives include
>>>>>>>>>>>>>>>>>>>>>>> leveraging the known changelog view or merge patterns 
>>>>>>>>>>>>>>>>>>>>>>> [2] or improving the
>>>>>>>>>>>>>>>>>>>>>>> existing maintenance procedures.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I think the potential for being able to use a
>>>>>>>>>>>>>>>>>>>>>>> inverted index for upsert cases to more directly 
>>>>>>>>>>>>>>>>>>>>>>> identify positions in a
>>>>>>>>>>>>>>>>>>>>>>> file to directly write DVs  is very exciting, but 
>>>>>>>>>>>>>>>>>>>>>>> before getting too far
>>>>>>>>>>>>>>>>>>>>>>> into the weeds, I think it'd first be helpful
>>>>>>>>>>>>>>>>>>>>>>> to make sure we agree on the specific problem we're
>>>>>>>>>>>>>>>>>>>>>>> trying to solve when we talk about performance 
>>>>>>>>>>>>>>>>>>>>>>> improvements along with any
>>>>>>>>>>>>>>>>>>>>>>> use cases, followed by comparison with known 
>>>>>>>>>>>>>>>>>>>>>>> alternatives (ideally we can
>>>>>>>>>>>>>>>>>>>>>>> get numbers that demonstrate the 
>>>>>>>>>>>>>>>>>>>>>>> read/write/storage/cost tradeoffs for the
>>>>>>>>>>>>>>>>>>>>>>> proposed inverted index).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6
>>>>>>>>>>>>>>>>>>>>>>> [2]https://www.tabular.io/blog/hello-world-of-cdc/
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>> Amogh Jahagirdar
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>

Reply via email to