Thank you for the thoughtful feedback, Yan, and for bringing up these important questions.
> How realistic is the scenario I've described, and what's the likelihood of encountering it in production environments? I don’t have direct visibility into that either, but I’ve seen some vendors claim they can achieve sub-minute latency and write CDC streams to Iceberg with thousands of changes per minute. I’ll defer to others who may have more hands-on experience. > and in such situation users should be expected to firstly perform some other techniques such as compaction or other workload optimizations before considering adopting this index feature; but meanwhile I think we do want to make sure for well-maintained tables and valid/common use cases, the new proposal will not inadvertently creating limitations or bottlenecks. Yes, I totally agree. We definitely don’t want to introduce a new bottleneck while trying to solve an existing limitation. That’s one of the key reasons I’m proposing a file-level index approach. But as I’ve been thinking more about it, beyond caching index files on disk and storing footers in executor memory, we could also consider consolidated indexing at the manifest level. This would involve aggregating file-level indexes into the manifest, which could significantly improve efficiency, especially for handling many small files. We could apply a similar consolidation strategy to older partitions as well, potentially tying them to a separate manifest-level index file. Depending on the index size, we can decide whether to embed the index directly in the manifest or store it as a standalone file. Thanks Haizhou for your interest! > 1. Is this an Iceberg issue, or a Parquet (table format provider) issue? Like Steven mentioned, parquet is a file format. But your point is valid, Parquet is optimized for sequential scan and lacks efficient random access capabilities. That’s why we need an index to support fast lookups. Each file format is designed with different workload patterns, and that’s how I’m thinking about this problem, by considering the strengths and limitations of each format for the use case. Hope that makes sense. > I think there is mention of the newly added index files requiring its own compaction workflow. A severe pain point today for many of our Flink based ingestion/DA/ETL users is that the compaction workflow takes longer than the commit interval - the highly frequent data change commit basically blocks the compaction commit from going into the table for an extremely long time (theoretically, could be forever). Thanks for pointing this out. If we're using file level indexing, the index files would be compacted alongside data files as part of the regular compaction workflow, so it shouldn't introduce additional compaction cycles. I’d love to learn more about your use cases, particularly how your table is partitioned, commit Interval, avg scale of your table, if you’re open to sharing. That context would help us understand how the indexing strategy might fit. Thanks, Xiaoxuan On Fri, Jun 6, 2025 at 7:34 PM Yan Yan <yyany...@gmail.com> wrote: > Thanks Xiaoxuan for the detailed proposal and everyone for the great > discussion! > > It seems to me that it feels more valuable if we can firstly clearly > define the specific use case we're trying to address, as this would help us > make more informed decisions about trade-offs between file vs partitioned > level indexing. > > Even if we already set up our goal to be leveraging indexing to replace > most equality-based deletes to positional deletes for CDC scenarios, I > think we need to consider the specific characteristics of both the table > and streaming workload. For instance, in a CDC environment where data > streams in quick and small batches, one extreme case I could imagine is a > table could accumulate hundreds of thousands of small files, with records > distributed randomly, and CDC dumps data in rapid succession leading to > fast increase in number of files. > > In such scenarios, meeting the requirement of "the table should be > partitioned and sorted by the PK" for efficient file-level indexing might > not easily be achievable; and the randomized data distribution and the > sheer volume of files (each with its own index) would require loading a big > number of index files into driver's memory to determine the correct delete > positions per data file for upserts. While thread pooling could help > optimize concurrent file reads and reduce S3 connection overhead, when > dealing with opening thousands of files or more, it would pose a great > challenge for the driver to operate well even when the concurrent reading > capabilities, preloading and caching mechanisms are in use. > > On the other hand, I could also see a complicated partitioned/global index > approach, if not designed or even configured well, could introduce > complications at write scenario even for straightforward pure insert > operations, potentially resulting in much more overhead than the file-level > index proposal aimed at optimizing CDC writes. Additional considerations > like multi-writer scenarios could further add complexity, as Xiaoxuan > previously mentioned (Peter/Steven, I think if you wouldn't mind sharing > more details about the proposal you mentioned earlier that would be great; > maybe we could have some offline discussion on this). > > My general questions would be: > 1. How realistic is the scenario I've described, and what's the likelihood > of encountering it in production environments? I personally indeed do not > have much visibility into this. > 2. Should this scenario be within the scope of our proposal? I could > totally understand the argument that a poorly maintained tables would > naturally incur performance penalties, and in such situation users should > be expected to firstly perform some other techniques such as compaction or > other workload optimizations before considering adopting this index > feature; but meanwhile I think we do want to make sure for well-maintained > tables and valid/common use cases, the new proposal will not inadvertently > creating limitations or bottlenecks. > > Thanks, > Yan > > On Wed, Jun 4, 2025 at 8:59 PM Steven Wu <stevenz...@gmail.com> wrote: > >> Haizhou, >> >> 1. it is probably inaccurate to call Parquet a table format provider. >> Parquet is a just file format. Delete vectors (position deletes) are >> outside the scope of Parquet files. The nature of equality deletes just >> make it impossible to read in constant time O(1) >> >> 2. The inverted index idea is still in early discussions and not ready to >> be shared with the broader community. But to your question, it won't make >> compaction worse (slower). Totally understand the pain point you raised. It >> should be discussed and covered in whatever proposal. >> >> Thanks, >> Steven >> >> On Wed, Jun 4, 2025 at 5:52 PM Haizhou Zhao <zhaohaizhou940...@gmail.com> >> wrote: >> >>> Hey folks, >>> >>> Thanks for discussing this interesting topic. I have couple relevant >>> thoughts while reading through this thread: >>> >>> 1. Is this an Iceberg issue, or a Parquet (table format provider) issue? >>> For example, if Parquet (or other table format provider) provides a >>> mechanism where both query by position and query by equality in a data file >>> take constant time O(1), then would equality deletes still cause the same >>> kind of pain for Iceberg tables? For the Iceberg community, how do we >>> define the boundary of responsibility between the Iceberg project and other >>> table format providers like Parquet? >>> >>> 2. I think there is mention of the newly added index files requiring its >>> own compaction workflow. A severe pain point today for many of our Flink >>> based ingestion/DA/ETL users is that the compaction workflow takes longer >>> than the commit interval - the highly frequent data change commit basically >>> blocks the compaction commit from going into the table for an extremely >>> long time (theoretically, could be forever). I understand our problem here >>> is entirely different from the proposal in this thread, but will the newly >>> added index files push up the compaction workflow runtime significantly, >>> and thus making an existing issue worse? >>> >>> Thanks, >>> -Haizhou >>> >>> On Wed, Jun 4, 2025 at 3:21 PM Xiaoxuan Li <xiaoxuan.li....@gmail.com> >>> wrote: >>> >>>> Totally agree, supporting mutability on top of immutable storage at >>>> scale is a non-trivial problem. >>>> I think the number of index files is ok, we can preload them in >>>> parallel or cache them on disk. Not sure yet about caching deserialized >>>> data, that might need some more thought. >>>> >>>> Xiaoxuan >>>> >>>> On Wed, Jun 4, 2025 at 5:21 AM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> > Our primary strategy for accelerating lookups focuses on optimizing >>>>> the index file itself, leveraging sorted keys, smaller row group sizes, >>>>> and >>>>> Bloom filters for Parquet files. We’re also exploring custom formats that >>>>> support more fine-grained skipping. >>>>> >>>>> The techniques you mentioned are important, but in blob stores, the >>>>> number of files that need to be accessed often has a greater impact than >>>>> how each file is read. >>>>> >>>>> > > but if there are updates around your PK range, then you need to >>>>> read more index files. >>>>> >>>>> > Yes, as the table grows, the index files will scale accordingly. >>>>> >>>>> I wanted to point out the following issue: Even if the table size >>>>> remains constant and only updates occur, the number of files that need to >>>>> be accessed continues to grow. >>>>> >>>>> >>>>> > when updating many random keys, it's likely to touch nearly all >>>>> buckets, which increases the number of index files that must be scanned. >>>>> This is exactly why the lookup performance of the index file itself >>>>> becomes >>>>> so critical. >>>>> >>>>> Once again, in blob storage systems, file access does not scale >>>>> efficiently. To address this, we need a strategy that enables result >>>>> caching. However, caching based on data files becomes ineffective when >>>>> there are frequent updates. In such cases, we must access every index file >>>>> added after the original file was created. While it's possible to filter >>>>> out some index files using statistics, this is largely unreliable due to >>>>> the inherently random nature of data ingestion. >>>>> >>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025. jún. >>>>> 4., Sze, 0:45): >>>>> >>>>>> Hi Peter, >>>>>> >>>>>> > If the table is partitioned and sorted by the PK, we don't really >>>>>> need to have any index. We can find the data file containing the record >>>>>> based on the Content File statistics, and the RowGroup containing the >>>>>> record based on the Parquet metadata. >>>>>> >>>>>> Our primary strategy for accelerating lookups focuses on optimizing >>>>>> the index file itself, leveraging sorted keys, smaller row group sizes, >>>>>> and >>>>>> Bloom filters for Parquet files. We’re also exploring custom formats that >>>>>> support more fine-grained skipping. >>>>>> >>>>>> But I agree, managing indexes is like managing a table. If we tightly >>>>>> coupled partitioning and file skipping to the base table, it could limit >>>>>> flexibility and broader use cases. By having global indexes, we can >>>>>> decouple those constraints and enable manifests level skipping by using a >>>>>> different partition strategy for indexes. But that also leads us into a >>>>>> kind of circular dependency, do we want to treat the index as a table? >>>>>> I'm >>>>>> happy to continue iterating on this idea. >>>>>> >>>>>> > but if there are updates around your PK range, then you need to >>>>>> read more index files. >>>>>> >>>>>> Yes, as the table grows, the index files will scale accordingly. >>>>>> >>>>>> > Do I understand correctly that your use case is updating a single >>>>>> record >>>>>> >>>>>> That was just an example, I'm planning to use a workload that >>>>>> accesses 0.01% of records from a 2-billion-row dataset as the >>>>>> experimental >>>>>> baseline. But yes, when updating many random keys, it's likely to touch >>>>>> nearly all buckets, which increases the number of index files that must >>>>>> be >>>>>> scanned. This is exactly why the lookup performance of the index file >>>>>> itself becomes so critical. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Xiaoxuan >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 3, 2025 at 6:37 AM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> >>>>>>> Hi Xiaoxuan, >>>>>>> >>>>>>> > 2. File-Level Indexing >>>>>>> > [..] >>>>>>> > To make this efficient, the table should be partitioned and sorted >>>>>>> by the PK. >>>>>>> >>>>>>> If the table is partitioned and sorted by the PK, we don't really >>>>>>> need to have any index. We can find the data file containing the record >>>>>>> based on the Content File statistics, and the RowGroup containing the >>>>>>> record based on the Parquet metadata. >>>>>>> If the data is not fully sorted, then we will have multiple >>>>>>> index files to read. For example, if data is inserted and then 20 >>>>>>> updates >>>>>>> occur where the primary key falls within the range of the updated >>>>>>> records—but our specific key is not among those updated—we still need to >>>>>>> scan all files to find the current values for the record. There ways >>>>>>> around to filter out these files, but index file doesn't help here much. >>>>>>> >>>>>>> > Index access is naturally distributed as index files are >>>>>>> co-located with data files during scan. >>>>>>> >>>>>>> I don't get this. Sure, you can co-locate the index file with the >>>>>>> file you are reading, but if there are updates around your PK range, >>>>>>> then >>>>>>> you need to read more index files. >>>>>>> >>>>>>> About the use-cases: >>>>>>> >>>>>>> Do I understand correctly that your use case is updating a single >>>>>>> record (like "UPDATE users SET discount='GOLD' WHERE user_id='GUID'"), >>>>>>> and >>>>>>> not like a updating multiple records at once (like "UPDATE users SET >>>>>>> discount='GOLD' WHERE purchase_value > 1000")? >>>>>>> >>>>>>> IMHO, if we have updates for many records, caching is a must. >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 3, 2025, 04:43 Xiaoxuan Li <xiaoxuan.li....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks, Peter, for bringing these ideas forward! and you also >>>>>>>> raised a great point about clarifying the goal of indexing. I’ve been >>>>>>>> considering it with the intention of eventually enabling fast upserts >>>>>>>> through DVs. To support that, we need an index that maps primary keys >>>>>>>> to >>>>>>>> both the data file and the key’s location within it.So the key lookup >>>>>>>> speed >>>>>>>> is crucial here. This will allow us to quickly determine whether a key >>>>>>>> already exists and, if so, identify the location it resides in. In the >>>>>>>> event no data file contains the key, we can simply append new records. >>>>>>>> >>>>>>>> So there are two primary strategies for implementing indexing, and >>>>>>>> based on your description, you and Steven’s approach seems to fit more >>>>>>>> closely into the first one. Let me know if that’s accurate. >>>>>>>> 1. *Partitioned (Global) Indexing* >>>>>>>> >>>>>>>> This approach builds a *table-level global index* that can be >>>>>>>> partitioned independently of the base table’s partition scheme. >>>>>>>> Instead of >>>>>>>> aligning with table partitions, the index can be bucketed using a hash >>>>>>>> function on primary keys, ensuring that each PK deterministically maps >>>>>>>> to a >>>>>>>> specific index file. >>>>>>>> >>>>>>>> This model is well-suited for bulk index initial loading, providing >>>>>>>> consistent and efficient point lookups through deterministic >>>>>>>> key-to-file >>>>>>>> mapping. The primary challenge is maintaining index freshness as new >>>>>>>> records arrive, requiring mechanisms to keep the index files >>>>>>>> synchronized >>>>>>>> with the underlying data. This approach is similar to how Hudi manages >>>>>>>> its >>>>>>>> record index in that the number of partitions matches the number of >>>>>>>> index >>>>>>>> file groups(this is configurable to the user). >>>>>>>> >>>>>>>> *Pros:* >>>>>>>> >>>>>>>> - *Flexible partitioning*: Index partitions are decoupled from >>>>>>>> table partitions, allowing more control over lookup performance. >>>>>>>> - Index files do not need to be rewritten during compaction >>>>>>>> jobs. >>>>>>>> >>>>>>>> *Cons:* >>>>>>>> >>>>>>>> - *Synchronous maintenance*: Index must be kept up to date >>>>>>>> during each write, adding complexity. And with index files in each >>>>>>>> partition accumulating, a compaction job for index files in each >>>>>>>> partition >>>>>>>> might be needed. >>>>>>>> - *Distributed access*: I don’t yet see a clean way to read and >>>>>>>> maintain the index in a distributed fashion across engines. This >>>>>>>> aspect >>>>>>>> needs some further design and brainstorming. >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> 2. *File-Level Indexing* >>>>>>>> >>>>>>>> In this approach, *each data file has a corresponding index file*. >>>>>>>> When an upsert arrives, we rely on *table partitioning* and *file >>>>>>>> pruning* to reduce the number of index files we need to scan. To >>>>>>>> make this efficient, the table should be partitioned and sorted by the >>>>>>>> PK. >>>>>>>> >>>>>>>> We can also further accelerate index file pruning using *Bloom >>>>>>>> filters*: >>>>>>>> >>>>>>>> - Either as a *centralized Bloom index*, storing filters for >>>>>>>> all files in one place. (table level bloom filter index?) >>>>>>>> - Or embedded within *Parquet File* (if the index itself is >>>>>>>> Parquet). >>>>>>>> >>>>>>>> *Pros:* >>>>>>>> >>>>>>>> - No need to track index-data consistency. If an index file is >>>>>>>> missing, we just scan the data file directly. >>>>>>>> - Index access is naturally *distributed* as index files are >>>>>>>> co-located with data files during scan. >>>>>>>> >>>>>>>> *Cons:* >>>>>>>> >>>>>>>> - *Less flexible*: The index needs to use the same partitioning >>>>>>>> strategy as the table. >>>>>>>> - *Compaction overhead*: Whenever data files are compacted or >>>>>>>> rewritten, the corresponding index files must also be updated. >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> >>>>>>>> I am still leaning toward Option 2 for its simplicity. And, if >>>>>>>> combined with the Bloom filters, its performance is promising. While I >>>>>>>> agree that Option 1 is also worth considering because of the >>>>>>>> flexibility it >>>>>>>> offers to partition skipping, I feel its uncertain performance gain >>>>>>>> might >>>>>>>> not justify the implementation complexity added by it. Let me know >>>>>>>> what you >>>>>>>> think about it. >>>>>>>> Additional Note: >>>>>>>> >>>>>>>> I think periodically rewriting indexes to match query patterns >>>>>>>> might not be a good idea, as it can be very costly for large datasets. >>>>>>>> Also, the index file format will play a crucial role in key lookup >>>>>>>> performance. For example, Hudi uses HFile for indexing, which supports >>>>>>>> fast >>>>>>>> point lookups thanks to its sorted key-value layout, built-in caching, >>>>>>>> and >>>>>>>> Bloom filters. >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Xiaoxuan >>>>>>>> >>>>>>>> On Fri, May 30, 2025 at 4:25 AM Péter Váry < >>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Xiaoxuan, >>>>>>>>> I hope you had a good time on your time off! >>>>>>>>> >>>>>>>>> Thanks for your detailed response. I think it would help if we >>>>>>>>> focused on the specific use cases we want to support and what we >>>>>>>>> ultimately >>>>>>>>> aim to achieve. By my understanding, there are a few distinct >>>>>>>>> scenarios >>>>>>>>> we’ve been circling around: >>>>>>>>> >>>>>>>>> 1. Do we want writers to be able to easily write positional >>>>>>>>> deletes instead of equality deletes? >>>>>>>>> 2. Do we want readers to easily convert equality deletes to >>>>>>>>> positional deletes? >>>>>>>>> 3. Do we want readers to easily find records based on a >>>>>>>>> primary key? >>>>>>>>> >>>>>>>>> Let me know if I’ve missed any important ones. >>>>>>>>> >>>>>>>>> Personally, I see the first use case as the most critical, and >>>>>>>>> solving it makes the 2nd one obsolete. Please let me know about your >>>>>>>>> thoughts/preferences. It would help understand your point of view >>>>>>>>> better. >>>>>>>>> >>>>>>>>> I also want to reiterate a point from my earlier comments: >>>>>>>>> > Notice that I used "index-partition". During the design we can >>>>>>>>> decide to use the table partitioning for index partitioning as well. >>>>>>>>> > That is why I was suggesting "index-partition"s. I don't have a >>>>>>>>> ready answer for this, but making sure that the index split fits into >>>>>>>>> the >>>>>>>>> memory is important for Flink as well. >>>>>>>>> >>>>>>>>> So I agree with your point: for large partitions, directly mapping >>>>>>>>> them to "index-partition"s could be prohibitive. >>>>>>>>> >>>>>>>>> In discussions with Steven, we came up with a generic idea that >>>>>>>>> might help: >>>>>>>>> >>>>>>>>> - Store index files as a new content file type in the manifest >>>>>>>>> list, with column stats to support filtering during planning. >>>>>>>>> - Periodically rewrite indexes to align with query patterns >>>>>>>>> (e.g., bucketing by hashed PK). >>>>>>>>> - We're only interested in unique indexes for now, so we can >>>>>>>>> rely on snapshot sequence numbers instead of delete vectors. >>>>>>>>> - The merging logic helps when writers produce index files >>>>>>>>> with different granularities - we just read all relevant index >>>>>>>>> files. >>>>>>>>> >>>>>>>>> That said, organizing both the index files and the records within >>>>>>>>> them to best support our query patterns remains key. That’s why >>>>>>>>> understanding the actual use cases is so important. >>>>>>>>> >>>>>>>>> For the Flink ingestion use case specifically, our best idea for >>>>>>>>> organizing the index is as follows: >>>>>>>>> >>>>>>>>> - Generate ranges based on the primary key (PK) hash values. >>>>>>>>> - Order records within each index file by PK hash. >>>>>>>>> >>>>>>>>> This structure allows us to locate a record with a given PK by >>>>>>>>> reading just one index file and accessing a single row group within >>>>>>>>> the >>>>>>>>> corresponding index file. It’s a lightweight and performant approach >>>>>>>>> that >>>>>>>>> aligns well with Flink’s streaming characteristics. >>>>>>>>> >>>>>>>>> Looking forward to your thoughts! >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Peter >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025. >>>>>>>>> máj. 29., Cs, 18:16): >>>>>>>>> >>>>>>>>>> Hi Peter, thanks for sharing the context around the Flink >>>>>>>>>> streaming use case and side note for concurrent write. Apologies for >>>>>>>>>> the >>>>>>>>>> delay as I just got back from a vacation. Yeah, I agree, having the >>>>>>>>>> index >>>>>>>>>> at the partition level is a better approach if we plan to use >>>>>>>>>> caching. As a >>>>>>>>>> distributed cache would introduce additional system overhead, and >>>>>>>>>> maintaining separate caches on each node could lead to redundant >>>>>>>>>> storage >>>>>>>>>> footprints. >>>>>>>>>> >>>>>>>>>> But having one index file for the entire partition is not >>>>>>>>>> feasible because paying the cost of rewriting the index file for an >>>>>>>>>> entire >>>>>>>>>> partition on every write operation is expensive and could introduce >>>>>>>>>> significant write latency. If we want to use indexes at planning >>>>>>>>>> time, we >>>>>>>>>> would need a mapping from primary keys to data files and row >>>>>>>>>> positions for >>>>>>>>>> the whole partition. This can be constructed by aggregating >>>>>>>>>> file-level >>>>>>>>>> indexes at the planner. Even though each index file corresponds to a >>>>>>>>>> single >>>>>>>>>> data file, we can use a thread pool to load them in parallel. >>>>>>>>>> Compared to >>>>>>>>>> loading one large index file, the performance could be similar. >>>>>>>>>> Since the >>>>>>>>>> index files are already cached in memory or on disk, index files >>>>>>>>>> loading >>>>>>>>>> time becomes negligible. In fact, having one index file per data >>>>>>>>>> file might >>>>>>>>>> be advantageous, as it allows for incremental loading. >>>>>>>>>> >>>>>>>>>> One thing about file-level indexing is that planner or task node >>>>>>>>>> is not required to scan all index files upfront. Partition pruning >>>>>>>>>> and file >>>>>>>>>> filtering should occur prior to index access, allowing us to load >>>>>>>>>> only the >>>>>>>>>> relevant index files associated with the pruned data set. >>>>>>>>>> >>>>>>>>>> Another concern I have with the caching approach is determining >>>>>>>>>> which partitions to cache, since caching indexes for the entire >>>>>>>>>> table isn't >>>>>>>>>> practical. For time-based partitions, this might be more >>>>>>>>>> straightforward, >>>>>>>>>> for example, caching the most recent partitions. However, that's not >>>>>>>>>> always >>>>>>>>>> applicable for all use cases. >>>>>>>>>> >>>>>>>>>> If after prototyping, we find the solution isn’t performant >>>>>>>>>> enough for the streaming use case and we still want it to be >>>>>>>>>> handled, we >>>>>>>>>> could explore a hybrid approach of Option 2 and Option 3 that Anton >>>>>>>>>> mentioned. >>>>>>>>>> >>>>>>>>>> Also, thanks to Ismail for highlighting the BigQuery approach, >>>>>>>>>> that's helpful context! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xiaoxuan >>>>>>>>>> >>>>>>>>>> On Wed, May 14, 2025 at 3:39 AM ismail simsek < >>>>>>>>>> ismailxsim...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, Thank you for working on this. >>>>>>>>>>> >>>>>>>>>>> I wanted to share a reference to the BigQuery implementation >>>>>>>>>>> <https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness> >>>>>>>>>>> (Option 3) as another potential approach, and for inspiration. >>>>>>>>>>> In this setup, The engine is running periodic merge jobs and >>>>>>>>>>> applying equality deletes to the actual table, based on PK. and for >>>>>>>>>>> some cases applying it during runtime. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness >>>>>>>>>>> >>>>>>>>>>> Best regards >>>>>>>>>>> ismail >>>>>>>>>>> >>>>>>>>>>> On Wed, May 14, 2025 at 7:37 AM Péter Váry < >>>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Xiaoxuan, >>>>>>>>>>>> >>>>>>>>>>>> Let me describe, how the Flink streaming writer uses equality >>>>>>>>>>>> deletes, and how it could use indexes. >>>>>>>>>>>> >>>>>>>>>>>> When the Flink streaming writer receives a new insert, then it >>>>>>>>>>>> appends the data to a data file. When it receives a delete, it >>>>>>>>>>>> appends the >>>>>>>>>>>> primary key to an equality delete file. When it receives an update >>>>>>>>>>>> the it >>>>>>>>>>>> creates both a data and a delete record. >>>>>>>>>>>> >>>>>>>>>>>> If we have an index available, the writer could rely on the >>>>>>>>>>>> index to get the position of the deleted record, and write a >>>>>>>>>>>> position >>>>>>>>>>>> delete instead of an equality delete. >>>>>>>>>>>> >>>>>>>>>>>> This index lookup is only effective, if we can keep the index >>>>>>>>>>>> in memory, and effectively distribute the incoming records between >>>>>>>>>>>> the >>>>>>>>>>>> writers so the index cache is used. In this case, the lookup cost >>>>>>>>>>>> becomes >>>>>>>>>>>> O(1) if we consider only the file access cost. If we need to read >>>>>>>>>>>> an index >>>>>>>>>>>> file for every data file we are adding O(n) delay on record level, >>>>>>>>>>>> where >>>>>>>>>>>> the n is the number of the data files in the table. >>>>>>>>>>>> >>>>>>>>>>>> I agree with you that the cost of rewriting the index file is >>>>>>>>>>>> not trivial, but that happens on commit level. This is much better >>>>>>>>>>>> than >>>>>>>>>>>> having an overhead on record level. >>>>>>>>>>>> >>>>>>>>>>>> > As a result, we’d likely need to split and read the index >>>>>>>>>>>> file in distributed planning mode >>>>>>>>>>>> >>>>>>>>>>>> That is why I was suggesting "index-partition"s. I don't have a >>>>>>>>>>>> ready answer for this, but making sure that the index split fits >>>>>>>>>>>> into the >>>>>>>>>>>> memory is important for Flink as well. >>>>>>>>>>>> >>>>>>>>>>>> > However the drawback of partition-level index is that the >>>>>>>>>>>> index must always be kept up to date to remain useful. >>>>>>>>>>>> >>>>>>>>>>>> I don't see how we can avoid that in case of Flink writers. The >>>>>>>>>>>> alternative is to read the non-indexed files in every writer, >>>>>>>>>>>> which seems >>>>>>>>>>>> like a no-go for me. >>>>>>>>>>>> >>>>>>>>>>>> > the value of an inverted index is not column pruning. It’s >>>>>>>>>>>> how it enables fast point lookups >>>>>>>>>>>> >>>>>>>>>>>> In my experience, the biggest gains are realized during >>>>>>>>>>>> planning, when the planner prunes whole files. While there is a >>>>>>>>>>>> possibility >>>>>>>>>>>> for a distributed planner to read fewer number of index files, I >>>>>>>>>>>> don't >>>>>>>>>>>> think it is possible for a planner to read the index files if they >>>>>>>>>>>> are >>>>>>>>>>>> stored for every data file. (Unless we are talking about a catalog >>>>>>>>>>>> which >>>>>>>>>>>> merges/caches the relevant part of the indexes. Sadly this is >>>>>>>>>>>> something >>>>>>>>>>>> which is available for the Flink writers) >>>>>>>>>>>> >>>>>>>>>>>> Side note: The index solution still prevents (or makes >>>>>>>>>>>> complicated) to handle concurrent writes to the table. Currently, >>>>>>>>>>>> if a >>>>>>>>>>>> concurrent writer updated the record, Flink just updated it again, >>>>>>>>>>>> and only >>>>>>>>>>>> a single record remains with the doubly updated primary key. With >>>>>>>>>>>> the index >>>>>>>>>>>> based solution, we might end up with duplicated keys. This might >>>>>>>>>>>> be an >>>>>>>>>>>> acceptable tradeoff, but we should be aware of it. >>>>>>>>>>>> >>>>>>>>>>>> Thanks for working on this Xiaoxuan! >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 14, 2025, 05:25 Xiaoxuan Li < >>>>>>>>>>>> xiaoxuan.li....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Peter, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the detailed illustration. I understand your >>>>>>>>>>>>> concern. >>>>>>>>>>>>> >>>>>>>>>>>>> I believe the core question here is whether the index is used >>>>>>>>>>>>> during job planning or at the scan task. This depends on how >>>>>>>>>>>>> index files >>>>>>>>>>>>> are referenced, at the file level or partition level. >>>>>>>>>>>>> >>>>>>>>>>>>> In my view, both approaches ultimately serve the same purpose. >>>>>>>>>>>>> The main difference lies in how the index files are generated and >>>>>>>>>>>>> split. >>>>>>>>>>>>> >>>>>>>>>>>>> For a partition-level index, would we generate a single index >>>>>>>>>>>>> file per partition? If so, each update to the partition would >>>>>>>>>>>>> require >>>>>>>>>>>>> either rewriting a whole new index file, which could be costly at >>>>>>>>>>>>> write >>>>>>>>>>>>> time given that the index size grows along with data size, or >>>>>>>>>>>>> appending a >>>>>>>>>>>>> new index file per write operation, which would functionally be >>>>>>>>>>>>> similar to >>>>>>>>>>>>> a file-level index. >>>>>>>>>>>>> >>>>>>>>>>>>> And due to the potentially large size of index files, even one >>>>>>>>>>>>> index file per partition may not support efficient planning in >>>>>>>>>>>>> local >>>>>>>>>>>>> planning mode. As a result, we’d likely need to split and read >>>>>>>>>>>>> the index >>>>>>>>>>>>> file in distributed planning mode, making it functionally >>>>>>>>>>>>> equivalent to >>>>>>>>>>>>> reading multiple index files at the task level. >>>>>>>>>>>>> >>>>>>>>>>>>> However the drawback of partition-level index is that the >>>>>>>>>>>>> index must always be kept up to date to remain useful. >>>>>>>>>>>>> >>>>>>>>>>>>> Also, Steven, in my opinion, the value of an inverted index is >>>>>>>>>>>>> not column pruning. It’s how it enables fast point lookups. As >>>>>>>>>>>>> mentioned >>>>>>>>>>>>> earlier, we just need to ensure that index lookups are faster >>>>>>>>>>>>> than file >>>>>>>>>>>>> scan with delete predicate evaluation. >>>>>>>>>>>>> >>>>>>>>>>>>> Let me know whether this addresses any of the concerns. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Xiaoxuan >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, May 12, 2025 at 4:34 PM Steven Wu < >>>>>>>>>>>>> stevenz...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> agree with Peter that 1:1 mapping of data files and inverted >>>>>>>>>>>>>> indexes are not as useful. With columnar format like Parquet, >>>>>>>>>>>>>> this can also >>>>>>>>>>>>>> be achieved equivalently by reading the data file with >>>>>>>>>>>>>> projection on the >>>>>>>>>>>>>> identifier columns. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, May 12, 2025 at 4:20 AM Péter Váry < >>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Xiaoxuan, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Do we plan to store the indexes in a separate file alongside >>>>>>>>>>>>>>> the data files? If so, then I have the following thoughts: >>>>>>>>>>>>>>> - I agree that the 1-on-1 mapping of data files and index >>>>>>>>>>>>>>> files is easy to maintain OTOH it is less useful as an index. >>>>>>>>>>>>>>> - The writer (which is looking for a column with a specific >>>>>>>>>>>>>>> primary key) needs to open all of the index files until it >>>>>>>>>>>>>>> finds the given >>>>>>>>>>>>>>> key. Since the index files are typically small, the cost here >>>>>>>>>>>>>>> is O(n) where >>>>>>>>>>>>>>> n is the number of the index files (equal to the number of the >>>>>>>>>>>>>>> data files). >>>>>>>>>>>>>>> - If we add a Bloom filter on the primary key to the data >>>>>>>>>>>>>>> files, then the writer reads the footer of every data file and >>>>>>>>>>>>>>> reads only >>>>>>>>>>>>>>> the file which contains the primary key. This is also O(n) >>>>>>>>>>>>>>> where the n is >>>>>>>>>>>>>>> the number of the data files. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So, IMHO having a 1-on-1 mapping between the data files and >>>>>>>>>>>>>>> the index files is not too beneficial. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> OTOH if we maintain the index files on "index-partition" >>>>>>>>>>>>>>> level, then the cost becomes O(p) where the p is the number of >>>>>>>>>>>>>>> "index-partitions" which could be significantly lower. Notice >>>>>>>>>>>>>>> that I used >>>>>>>>>>>>>>> "index-partition". During the design we can decide to use the >>>>>>>>>>>>>>> table >>>>>>>>>>>>>>> partitioning for index partitioning as well. This requires us >>>>>>>>>>>>>>> to use easily >>>>>>>>>>>>>>> maintainable indexes. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So while maintaining the index is more costly if it contains >>>>>>>>>>>>>>> data from multiple data files, it also becomes much more useful. >>>>>>>>>>>>>>> Maintenance procedures could build upon the data file >>>>>>>>>>>>>>> immutability and >>>>>>>>>>>>>>> simplify the index maintenance. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Xiaoxuan for the good conversation! >>>>>>>>>>>>>>> Peter >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: >>>>>>>>>>>>>>> 2025. máj. 10., Szo, 9:13): >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks Anton for the context and summary of the options, >>>>>>>>>>>>>>>> great to hear that this direction aligns with earlier community >>>>>>>>>>>>>>>> discussions. And thanks Gyula and Peter for the clear >>>>>>>>>>>>>>>> analysis. I agree >>>>>>>>>>>>>>>> with both of you, the index needs to be designed and >>>>>>>>>>>>>>>> implemented >>>>>>>>>>>>>>>> efficiently in order to scale for large data sets and >>>>>>>>>>>>>>>> streaming use cases. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I wanted to share some context around why I proposed a >>>>>>>>>>>>>>>> file-level index. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The benefit of a file-level index is that it naturally >>>>>>>>>>>>>>>> supports partition pruning, file skipping and time travel. >>>>>>>>>>>>>>>> Issues like >>>>>>>>>>>>>>>> index invalidation are inherently handled because the index is >>>>>>>>>>>>>>>> tied to the >>>>>>>>>>>>>>>> data file’s lifecycle. Even if the index isn't always >>>>>>>>>>>>>>>> perfectly in sync, >>>>>>>>>>>>>>>> it's still usable, and since its lifecycle is bound to the >>>>>>>>>>>>>>>> file, meaning >>>>>>>>>>>>>>>> lifecycle management only needs to track with the file itself, >>>>>>>>>>>>>>>> reducing >>>>>>>>>>>>>>>> complexity in index management and maintenance. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On the other hand, partition/table level indexes must >>>>>>>>>>>>>>>> explicitly manage synchronization with the table, handling >>>>>>>>>>>>>>>> updates/snapshots on the writer side, and partition pruning, >>>>>>>>>>>>>>>> file skipping, >>>>>>>>>>>>>>>> time travel, and index invalidation on the reader side. Since >>>>>>>>>>>>>>>> index files >>>>>>>>>>>>>>>> are immutable and grow alongside data files, maintaining them >>>>>>>>>>>>>>>> can become as >>>>>>>>>>>>>>>> complex as managing the entire table. It’s not surprising that >>>>>>>>>>>>>>>> Hudi uses a >>>>>>>>>>>>>>>> metadata table(which is essentially a Hudi MOR table) to >>>>>>>>>>>>>>>> manage its index. >>>>>>>>>>>>>>>> Personally, I find this approach less appealing, as it >>>>>>>>>>>>>>>> introduces a >>>>>>>>>>>>>>>> circular dependency that adds architectural complexity. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For the positional delete case, even if index lookups could >>>>>>>>>>>>>>>> be faster than scanning files with predicates, it's still >>>>>>>>>>>>>>>> unclear whether >>>>>>>>>>>>>>>> that alone is sufficient for streaming workloads, especially >>>>>>>>>>>>>>>> when many >>>>>>>>>>>>>>>> files are involved. I think Peter’s idea of maintaining a hot >>>>>>>>>>>>>>>> cache of the >>>>>>>>>>>>>>>> index within the streaming engine is promising. Alternatively, >>>>>>>>>>>>>>>> using an >>>>>>>>>>>>>>>> external key-value store for fast lookups could also be >>>>>>>>>>>>>>>> explored. Would be >>>>>>>>>>>>>>>> great to hear others’ thoughts on this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Xiaoxuan >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 8:12 AM Péter Váry < >>>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When going through the options mentioned by Anton, I feel >>>>>>>>>>>>>>>>> that Option 1 and 4 are just pushing the responsibility of >>>>>>>>>>>>>>>>> converting the >>>>>>>>>>>>>>>>> equality deletes to positional deletes to the engine side. >>>>>>>>>>>>>>>>> The only >>>>>>>>>>>>>>>>> difference is whether the conversion happens on the write >>>>>>>>>>>>>>>>> side or on the >>>>>>>>>>>>>>>>> read side. This is a step back, and doesn't help solving the >>>>>>>>>>>>>>>>> problem for >>>>>>>>>>>>>>>>> streaming engines. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Option 2 needs a bit of thought. We need to find a good >>>>>>>>>>>>>>>>> indexing strategy which allows big tables and plays nice with >>>>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>>> writes too. It would be good to allow the engines to cache a >>>>>>>>>>>>>>>>> limited part >>>>>>>>>>>>>>>>> of the index, and distribute records to the writers in a way >>>>>>>>>>>>>>>>> that the cache >>>>>>>>>>>>>>>>> locality is considered. Also creating an index file for every >>>>>>>>>>>>>>>>> data file >>>>>>>>>>>>>>>>> might be suboptimal, as reading that many index files could >>>>>>>>>>>>>>>>> tank >>>>>>>>>>>>>>>>> performance. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have yet to see a good idea for Option 3. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Another option could be to provide a way to convert the >>>>>>>>>>>>>>>>> equality deletes to positional deletes as soon as possible. >>>>>>>>>>>>>>>>> Maybe in >>>>>>>>>>>>>>>>> frequent compaction tasks, or at first read? When we first >>>>>>>>>>>>>>>>> apply an >>>>>>>>>>>>>>>>> equality delete for a file, it is very easy to calculate the >>>>>>>>>>>>>>>>> equivalent DV >>>>>>>>>>>>>>>>> for the equality delete. Subsequent readers could depend on >>>>>>>>>>>>>>>>> the DV instead >>>>>>>>>>>>>>>>> of the equality delete. This could be the least disruptive >>>>>>>>>>>>>>>>> change, but I >>>>>>>>>>>>>>>>> see a few issues with this solution as well: >>>>>>>>>>>>>>>>> - First read still depends on the equality delete which in >>>>>>>>>>>>>>>>> edge cases could be very costly >>>>>>>>>>>>>>>>> - Filters don't play well with this method >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> All-in-all, I like the index solution (Option 2) for >>>>>>>>>>>>>>>>> several reasons: >>>>>>>>>>>>>>>>> - Cuts out the need for equality deletes which would >>>>>>>>>>>>>>>>> reduce the code quite a bit >>>>>>>>>>>>>>>>> - While slows down the write path a bit, the overall >>>>>>>>>>>>>>>>> workflow (write+read) benefits from it >>>>>>>>>>>>>>>>> - The index could be reused for other use-cases as well >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Peter >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont: >>>>>>>>>>>>>>>>> 2025. máj. 9., P, 12:16): >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Anton, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thank you for summarizing the options we see at this >>>>>>>>>>>>>>>>>> stage in a structured and concise way. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Based on the use-cases I see in the industry, I feel that >>>>>>>>>>>>>>>>>> not all of the highlighted options are feasible (or >>>>>>>>>>>>>>>>>> desirable). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Option 4 would basically remove any possibilities for >>>>>>>>>>>>>>>>>> native streaming CDC reads on tables, severely limiting how >>>>>>>>>>>>>>>>>> Iceberg can be >>>>>>>>>>>>>>>>>> used in the future for real-time use-cases from Flink and >>>>>>>>>>>>>>>>>> other similar >>>>>>>>>>>>>>>>>> engines that may want to connect. I understand that the view >>>>>>>>>>>>>>>>>> reconciliation >>>>>>>>>>>>>>>>>> approach is implemented in Spark SQL already but >>>>>>>>>>>>>>>>>> implementing it in a >>>>>>>>>>>>>>>>>> proper streaming way would likely lead to similar problems >>>>>>>>>>>>>>>>>> that we are >>>>>>>>>>>>>>>>>> trying to solve here in the first place. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regarding Option 1, introducing engine specific/custom >>>>>>>>>>>>>>>>>> indexing solutions would go against the core design >>>>>>>>>>>>>>>>>> principles as it would >>>>>>>>>>>>>>>>>> be hard to mix different engines when writing / reading >>>>>>>>>>>>>>>>>> tables for CDC >>>>>>>>>>>>>>>>>> use-cases. (A streaming job would have a hard time writing >>>>>>>>>>>>>>>>>> upserts/equality >>>>>>>>>>>>>>>>>> deletes to tables that were written by a different engine). >>>>>>>>>>>>>>>>>> To >>>>>>>>>>>>>>>>>> me this sounds very similar to Option 4 in a way that it >>>>>>>>>>>>>>>>>> pushes too much >>>>>>>>>>>>>>>>>> logic to the engines in a way that would hurt the >>>>>>>>>>>>>>>>>> compatibility across the >>>>>>>>>>>>>>>>>> engines. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Based on this and the context of the discussion, Option 2 >>>>>>>>>>>>>>>>>> or a combination of Option 2 & 3 sounds the most reasonable >>>>>>>>>>>>>>>>>> to me. There >>>>>>>>>>>>>>>>>> are still a lot of questions on the practical implementation >>>>>>>>>>>>>>>>>> of the indexes >>>>>>>>>>>>>>>>>> and how we can do this efficiently so this is only a very >>>>>>>>>>>>>>>>>> early feedback >>>>>>>>>>>>>>>>>> from my end. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 12:14 AM Anton Okolnychyi < >>>>>>>>>>>>>>>>>> aokolnyc...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am glad to see that folks are thinking about this >>>>>>>>>>>>>>>>>>> problem. I am looking forward to a formal proposal/design >>>>>>>>>>>>>>>>>>> doc to discuss >>>>>>>>>>>>>>>>>>> details! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Overall, this aligns with what we discussed in the >>>>>>>>>>>>>>>>>>> community earlier w.r.t. the future of equality deletes and >>>>>>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>>>>> upserts. If I were to summarize, we have the following >>>>>>>>>>>>>>>>>>> options: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Option 1: Add an inverted index (potentially >>>>>>>>>>>>>>>>>>> distributed) maintained by an engine that does streaming >>>>>>>>>>>>>>>>>>> writes to always >>>>>>>>>>>>>>>>>>> produce DVs, even in streaming use cases. Deprecate/remove >>>>>>>>>>>>>>>>>>> equality deletes >>>>>>>>>>>>>>>>>>> from Iceberg. >>>>>>>>>>>>>>>>>>> Option 2: Add native indexing to Iceberg so that the >>>>>>>>>>>>>>>>>>> lookup of positions is quick and efficient enough to be >>>>>>>>>>>>>>>>>>> used in streaming >>>>>>>>>>>>>>>>>>> upserts. Deprecate/remove equality deletes from Iceberg. >>>>>>>>>>>>>>>>>>> Option 3: Rethink equality deletes, potentially by >>>>>>>>>>>>>>>>>>> introducing more restrictions and trying to scope them to >>>>>>>>>>>>>>>>>>> particular data >>>>>>>>>>>>>>>>>>> files, similar to DVs. >>>>>>>>>>>>>>>>>>> Option 4: Standardize on a view reconciliation approach >>>>>>>>>>>>>>>>>>> that Tabular implemented for CDC. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I would like to highlight that what Spark does today >>>>>>>>>>>>>>>>>>> during MERGE is similar to a lookup in an inverted index >>>>>>>>>>>>>>>>>>> represented by >>>>>>>>>>>>>>>>>>> another Iceberg table. That is OK for batch jobs but not >>>>>>>>>>>>>>>>>>> enough for >>>>>>>>>>>>>>>>>>> streaming. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - Anton >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> чт, 8 трав. 2025 р. о 10:08 Xiaoxuan Li < >>>>>>>>>>>>>>>>>>> xiaoxuan.li....@gmail.com> пише: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Zheng, Steven, Amogh and Gyula. Thank you all for >>>>>>>>>>>>>>>>>>>> the feedback! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I agree with everyone, we need to narrow down the scope >>>>>>>>>>>>>>>>>>>> of this optimization. The primary issue I'm trying to >>>>>>>>>>>>>>>>>>>> address is the slow >>>>>>>>>>>>>>>>>>>> read performance caused by the growing number of equality >>>>>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>> files(streaming CDC scenarios). The other potential use >>>>>>>>>>>>>>>>>>>> cases are only >>>>>>>>>>>>>>>>>>>> mentioned to show the extensibility of this approach. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> And both equality and positional deletes suffer from >>>>>>>>>>>>>>>>>>>> the same core problem, records are evaluated repeatedly >>>>>>>>>>>>>>>>>>>> against multiple >>>>>>>>>>>>>>>>>>>> delete predicates, at read time for equality deletes, and >>>>>>>>>>>>>>>>>>>> at write time for >>>>>>>>>>>>>>>>>>>> positional deletes. This repeated evaluation is where the >>>>>>>>>>>>>>>>>>>> real bottleneck >>>>>>>>>>>>>>>>>>>> lies. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> It’s particularly bad for equality deletes, since we >>>>>>>>>>>>>>>>>>>> constantly recompute row positions during reads without >>>>>>>>>>>>>>>>>>>> ever materializing >>>>>>>>>>>>>>>>>>>> them. Eventually, a maintenance job is required just to >>>>>>>>>>>>>>>>>>>> rewrite them into >>>>>>>>>>>>>>>>>>>> positional deletes. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The idea is to eliminate this overhead by introducing >>>>>>>>>>>>>>>>>>>> an inverted index that maps values (or value combinations) >>>>>>>>>>>>>>>>>>>> directly to row >>>>>>>>>>>>>>>>>>>> positions. This lets us skip full predicate evaluation and >>>>>>>>>>>>>>>>>>>> jump straight to >>>>>>>>>>>>>>>>>>>> the affected rows, similar to how Hudi uses a record-level >>>>>>>>>>>>>>>>>>>> index for fast >>>>>>>>>>>>>>>>>>>> upserts. If we can fetch row positions from the index, we >>>>>>>>>>>>>>>>>>>> can greatly >>>>>>>>>>>>>>>>>>>> reduce overhead during reads (for equality deletes) and >>>>>>>>>>>>>>>>>>>> writes (for >>>>>>>>>>>>>>>>>>>> positional deletes). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In fact, if we can make positional deletes perform as >>>>>>>>>>>>>>>>>>>> well as equality deletes using this index, we might be >>>>>>>>>>>>>>>>>>>> able to get rid of >>>>>>>>>>>>>>>>>>>> equality deletes, but that needs to be evaluated for the >>>>>>>>>>>>>>>>>>>> upsert case. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> That’s part of the reason why other types of indexes >>>>>>>>>>>>>>>>>>>> came up, they’re applicable beyond just primary key >>>>>>>>>>>>>>>>>>>> columns, but CDC is the >>>>>>>>>>>>>>>>>>>> only scenario with stricter SLA requirements. So it makes >>>>>>>>>>>>>>>>>>>> sense to align on >>>>>>>>>>>>>>>>>>>> the exact problem and use cases. For now, I think we can >>>>>>>>>>>>>>>>>>>> define our main >>>>>>>>>>>>>>>>>>>> goal as supporting inverted indexing over primary key >>>>>>>>>>>>>>>>>>>> columns to address >>>>>>>>>>>>>>>>>>>> the slowness of reading caused by the growing number of >>>>>>>>>>>>>>>>>>>> equality delete >>>>>>>>>>>>>>>>>>>> files. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, Amogh, for bringing up the comparison with >>>>>>>>>>>>>>>>>>>> known alternatives. We should include benchmarks for those >>>>>>>>>>>>>>>>>>>> as well, to >>>>>>>>>>>>>>>>>>>> illustrate the trade-offs in read/write performance, >>>>>>>>>>>>>>>>>>>> storage usage, and >>>>>>>>>>>>>>>>>>>> overall cost. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Really appreciate your feedback! I’ll incorporate these >>>>>>>>>>>>>>>>>>>> into the next revision of the proposal. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Xiaoxuan >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 1:25 AM Gyula Fóra < >>>>>>>>>>>>>>>>>>>> gyula.f...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you for the proposal! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I agree with what had been said above that we need to >>>>>>>>>>>>>>>>>>>>> narrow down the scope here and what is the primary target >>>>>>>>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>>>>>>> optimization. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> As Amogh has also pointed out, CDC (streaming) read >>>>>>>>>>>>>>>>>>>>> performance (with equality deletes) would be one of the >>>>>>>>>>>>>>>>>>>>> biggest >>>>>>>>>>>>>>>>>>>>> beneficiaries of this at a first glance. >>>>>>>>>>>>>>>>>>>>> This is especially important for Flink users where >>>>>>>>>>>>>>>>>>>>> this feature is currently completely missing and there is >>>>>>>>>>>>>>>>>>>>> a big demand for >>>>>>>>>>>>>>>>>>>>> it as we rely on equality deletes on the write path. [1] >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I am not aware of alternative proposals that would >>>>>>>>>>>>>>>>>>>>> solve the equality delete cdc read performance question, >>>>>>>>>>>>>>>>>>>>> overall I think >>>>>>>>>>>>>>>>>>>>> using indices is reasonable and a very promising approach. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Looking forward to more details and discussion! >>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/njmxjmjjm341fp4mgynn483v15mhk7qd >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 9:24 AM Amogh Jahagirdar < >>>>>>>>>>>>>>>>>>>>> 2am...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thank you for the proposal Xiaoxuan! I think I agree >>>>>>>>>>>>>>>>>>>>>> with Zheng and Steven's point that it'll probably be >>>>>>>>>>>>>>>>>>>>>> more helpful to start >>>>>>>>>>>>>>>>>>>>>> out with more specific "what" and "why" (known areas of >>>>>>>>>>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>>>>>>>>>> Iceberg and driven by any use cases) before we get too >>>>>>>>>>>>>>>>>>>>>> deep into the "how". >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In my mind, the specific known area of improvement >>>>>>>>>>>>>>>>>>>>>> for Iceberg related to this proposal is improving >>>>>>>>>>>>>>>>>>>>>> streaming upsert >>>>>>>>>>>>>>>>>>>>>> behavior. One area this improvement is beneficial for is >>>>>>>>>>>>>>>>>>>>>> being able to >>>>>>>>>>>>>>>>>>>>>> provide better data freshness for Iceberg CDC mirror >>>>>>>>>>>>>>>>>>>>>> tables without the >>>>>>>>>>>>>>>>>>>>>> heavy read + maintenance cost that currently exist with >>>>>>>>>>>>>>>>>>>>>> Flink upserts. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> As you mentioned, equality deletes have the benefit >>>>>>>>>>>>>>>>>>>>>> of being very cheap to write but can come at a high and >>>>>>>>>>>>>>>>>>>>>> unpredictable cost >>>>>>>>>>>>>>>>>>>>>> at read time. Challenges with equality deletes have been >>>>>>>>>>>>>>>>>>>>>> discussed in the >>>>>>>>>>>>>>>>>>>>>> past [1]. >>>>>>>>>>>>>>>>>>>>>> I'll also add that if one of the goals is to >>>>>>>>>>>>>>>>>>>>>> improving streaming upserts (e.g. for applying CDC >>>>>>>>>>>>>>>>>>>>>> change streams into >>>>>>>>>>>>>>>>>>>>>> Iceberg mirror tables), then there are alternatives that >>>>>>>>>>>>>>>>>>>>>> I think we should >>>>>>>>>>>>>>>>>>>>>> compare against to make >>>>>>>>>>>>>>>>>>>>>> the tradeoffs clear. These alternatives include >>>>>>>>>>>>>>>>>>>>>> leveraging the known changelog view or merge patterns >>>>>>>>>>>>>>>>>>>>>> [2] or improving the >>>>>>>>>>>>>>>>>>>>>> existing maintenance procedures. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think the potential for being able to use a >>>>>>>>>>>>>>>>>>>>>> inverted index for upsert cases to more directly >>>>>>>>>>>>>>>>>>>>>> identify positions in a >>>>>>>>>>>>>>>>>>>>>> file to directly write DVs is very exciting, but before >>>>>>>>>>>>>>>>>>>>>> getting too far >>>>>>>>>>>>>>>>>>>>>> into the weeds, I think it'd first be helpful >>>>>>>>>>>>>>>>>>>>>> to make sure we agree on the specific problem we're >>>>>>>>>>>>>>>>>>>>>> trying to solve when we talk about performance >>>>>>>>>>>>>>>>>>>>>> improvements along with any >>>>>>>>>>>>>>>>>>>>>> use cases, followed by comparison with known >>>>>>>>>>>>>>>>>>>>>> alternatives (ideally we can >>>>>>>>>>>>>>>>>>>>>> get numbers that demonstrate the read/write/storage/cost >>>>>>>>>>>>>>>>>>>>>> tradeoffs for the >>>>>>>>>>>>>>>>>>>>>> proposed inverted index). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6 >>>>>>>>>>>>>>>>>>>>>> [2]https://www.tabular.io/blog/hello-world-of-cdc/ >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Amogh Jahagirdar >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>