codope commented on a change in pull request #4640: URL: https://github.com/apache/hudi/pull/4640#discussion_r824623608
########## File path: rfc/rfc-45/rfc-45.md ########## @@ -0,0 +1,264 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# RFC-45: Asynchronous Metadata Indexing + +## Proposers + +- @codope +- @manojpec + +## Approvers + +- @nsivabalan +- @vinothchandar + +## Status + +JIRA: [HUDI-2488](https://issues.apache.org/jira/browse/HUDI-2488) + +## Abstract + +Metadata indexing (aka metadata bootstrapping) is the process of creation of one +or more metadata-based indexes, e.g. data partitions to files index, that is +stored in Hudi metadata table. Currently, the metadata table (referred as MDT +hereafter) supports single partition which is created synchronously with the +corresponding data table, i.e. commits are first applied to metadata table +followed by data table. Our goal for MDT is to support multiple partitions to +boost the performance of existing index and records lookup. However, the +synchronous manner of metadata indexing is not very scalable as we add more +partitions to the MDT because the regular writers (writing to the data table) +have to wait until the MDT commit completes. In this RFC, we propose a design to +support asynchronous metadata indexing. + +## Background + +We can read more about the MDT design +in [RFC-15](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+15%3A+HUDI+File+Listing+Improvements) +. Here is a quick summary of the current state (Hudi v0.10.1). MDT is an +internal Merge-on-Read (MOR) table that has a single partition called `files` +which stores the data partitions to files index that is used in file listing. +MDT is co-located with the data table (inside `.hoodie/metadata` directory under +the basepath). In order to handle multi-writer scenario, users configure lock +provider and only one writer can access MDT in read-write mode. Hence, any write +to MDT is guarded by the data table lock. This ensures only one write is +committed to MDT at any point in time and thus guarantees serializability. +However, locking overhead adversely affects the write throughput and will reach +its scalability limits as we add more partitions to the MDT. + +## Goals + +- Support indexing one or more partitions in MDT while regular writers and table + services (such as cleaning or compaction) are in progress. +- Locking to be as lightweight as possible. +- Keep required config changes to a minimum to simplify deployment / upgrade in + production. +- Do not require specific ordering of how writers and table service pipelines + need to be upgraded / restarted. +- If an external long-running process is being used to initialize the index, the + process should be made idempotent so it can handle errors from previous runs. +- To re-initialize the index, make it as simple as running the external + initialization process again without having to change configs. + +## Implementation + +### A new Hudi action: INDEX + +We introduce a new action `index` which will denote the index building process, +the mechanics of which is as follows: + +1. From an external process, users can issue a CREATE INDEX or similar statement + to trigger indexing for an existing table. + 1. This will schedule INDEX action and add + a `<instant_time>.index.requested` to the timeline, which contains the + indexing plan. Index scheduling will also initialize the filegroup for + the partitions for which indexing is planned. + 2. From here on, the index building process will continue to build an index + up to instant time `t`, where `t` is the latest completed instant time on + the timeline without any + "holes" i.e. no pending async operations prior to it. + 3. The indexing process will write these out as base files within the + corresponding metadata partition. A metadata partition cannot be used if + there is any pending indexing action against it. As and when indexing is + completed for a partition, then table config (`hoodie.properties`) will + be updated to indicate that partition is available for reads or + synchronous updates. Hudi table config will be the source of truth for + the current state of metadata index. + +2. Any inflight writers (i.e. with instant time `t'` > `t`) will check for any + new indexing request on the timeline prior to preparing to commit. + 1. Such writers will proceed to additionally add log entries corresponding + to each such indexing request into the metadata partition. + 2. There is always a TOCTOU issue here, where the inflight writer may not + see an indexing request that was just added and proceed to commit without + that. We will correct this during indexing action completion. In the + average case, this may not happen and the design has liveness. + +3. When the indexing process is about to complete (i.e. indexing upto + instant `t` is done but before completing indexing commit), it will check for + all completed commit instants after `t` to ensure each of them added entries + per its indexing plan, otherwise simply abort after a configurable timeout. + Let's call this the **indexing check**. So, the indexer will only write base + files but ensure that log entries due to instants after `t` are in the same + filegroup i.e. no new filegroup is initialized by writers while indexing is + in progress. + 1. The corner case here would be that the indexing check does not factor in + the inflight writer just about to commit. But given indexing would take + some finite amount of time to go from requested to completion (or we can + add some, configurable artificial delays here say 60 seconds), an + inflight writer, that is just about to commit concurrently, has a very + high chance of seeing the indexing plan and aborting itself. + +We can just introduce a lock for adding events to the timeline and these races +would vanish completely, still providing great scalability and asynchrony for +these processes. The indexer will error out if there is no lock provider +configured. + +### Multi-writer scenario + + + +Let us walkthrough a concrete mutli-writer scenario to understand the above +indexing mechanism. In this scenario, let instant `t0` be the last completed +instant on the timeline. Suppose user triggered index building from an external +process at `t3`. This will create `t3.index.requested` file with the indexing +plan. The plan contains the metadata partitions that need to be created and the +last completed instant, e.g. + +``` +[ + {MetadataPartitionType.FILES.partitionPath(), t0}, + {MetadataPartitionType.BLOOM_FILTER.partitionPath(), t0}, + {MetadataPartitionType.COLUMN_STATS.partitionPath(), t0} +] +``` + +Further, suppose there were two inflight writers Writer1 and Writer2 (with +inflight instants `t1` and `t2` respectively) while the indexing was requested +or inflight. In this case, the writers will check for pending index action and +find a pending instant `t3`. Now, if the metadata index creation is pending, +which means indexer has already intialized a filegroup, then each writer will +create log files in the same filegroup for the metadata index update. This will +happen within the existing data table lock. + +The indexer runs in a loop until the metadata for data upto `t0` plus the data +written due to `t1` and `t2` has been indexed, or the indexing timed out. +Whether indexing timed out or not, table config would be updated with any MDT +partition(s) for which indexing was complete till `t2`. In case of timeout +indexer will abort. At this point, user can trigger the index process again, +however, this time indexer will check for available partitions in table config +and skip those partitions. This design ensures that the regular writers do not +fail due to indexing. + +### Error Handling + +**Case 1: Writer fails while indexer is inflight** + +This means index update due to writer did not complete. Indexer continues to +build the index ignoring the failed instant due to writer. The next update by +the writer will trigger a rollback of the failed instant, which will also +rollback incomplete updates in metadata table. + +**Case 2: Indexer fails while writer is inflight** + +Writer will commit adding log entries to the metadata partition. However, table +config will indicate that partition is not ready to use. When indexer is +re-triggered, it will check the plan and table config to figure out which MDT +partitions to index and start indexing for those partitions. Review comment: Yes, that's the plan. But, it will start from scratch only for the partitions that were partially indexed i.e. partitions for which table config was not updated in the last indexing. Table config update always happens at the end of indexing for a partition. We don't want to start all over again for all the partitions. So, let's say at some `t` indexer was scheduled and it wrote `t.index.requested` with plan of indexing `files` and `column_stats` partitions. It completed `files` but failed midway for `column_stats`. Then table config will show that only `files` partition is available for reads/updates. When indexer starts the next time, it will see a pending index action, reads the plan as well as table config and figures out that only `column_stats` index is pending. Will clean the older filegroups for column_stats and choose the latest completed instant (without holes) on data timeline and create new filegroup and so on. If this sounds right, I can update this example in the RFC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org