This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new ecdcf4ee61 [HUDI-4535] Add document for flink data skipping (#6299)
ecdcf4ee61 is described below

commit ecdcf4ee61ff267d0739be3d66fc3c4bdb1a0d69
Author: Danny Chan <[email protected]>
AuthorDate: Thu Aug 4 17:44:38 2022 +0800

    [HUDI-4535] Add document for flink data skipping (#6299)
---
 website/docs/flink-quick-start-guide.md |  2 +-
 website/docs/hoodie_deltastreamer.md    | 80 ++++++++++++++++++++-------------
 website/docs/querying_data.md           | 61 +++++++++++++++++++++++++
 3 files changed, 112 insertions(+), 31 deletions(-)

diff --git a/website/docs/flink-quick-start-guide.md 
b/website/docs/flink-quick-start-guide.md
index e9ca0c3df5..4cf2b1042b 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -10,7 +10,7 @@ This guide helps you quickly start using Flink on Hudi, and 
learn different mode
 - **Quick Start** : Read [Quick Start](#quick-start) to get started quickly 
Flink sql client to write to(read from) Hudi.
 - **Configuration** : For [Global 
Configuration](flink_configuration#global-configurations), sets up through 
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through 
[Table Option](flink_configuration#table-options).
 - **Writing Data** : Flink supports different modes for writing, such as [CDC 
Ingestion](hoodie_deltastreamer#cdc-ingestion), [Bulk 
Insert](hoodie_deltastreamer#bulk-insert), [Index 
Bootstrap](hoodie_deltastreamer#index-bootstrap), [Changelog 
Mode](hoodie_deltastreamer#changelog-mode) and [Append 
Mode](hoodie_deltastreamer#append-mode).
-- **Querying Data** : Flink supports different modes for reading, such as 
[Streaming Query](hoodie_deltastreamer#streaming-query) and [Incremental 
Query](hoodie_deltastreamer#incremental-query).
+- **Querying Data** : Flink supports different modes for reading, such as 
[Streaming Query](querying_data#streaming-query) and [Incremental 
Query](querying_data#incremental-query).
 - **Tuning** : For write/read tasks, this guide gives some tuning suggestions, 
such as [Memory Optimization](flink_configuration#memory-optimization) and 
[Write Rate Limit](flink_configuration#write-rate-limit).
 - **Optimization**: Offline compaction is supported [Offline 
Compaction](compaction#flink-offline-compaction).
 - **Query Engines**: Besides Flink, many other engines are integrated: [Hive 
Query](syncing_metastore#flink-setup), [Presto 
Query](query_engine_setup#prestodb).
diff --git a/website/docs/hoodie_deltastreamer.md 
b/website/docs/hoodie_deltastreamer.md
index 607b6a989b..00300a426f 100644
--- a/website/docs/hoodie_deltastreamer.md
+++ b/website/docs/hoodie_deltastreamer.md
@@ -472,56 +472,76 @@ the compaction options: 
[`compaction.delta_commits`](#compaction) and [`compacti
 
 ### Append Mode
 
-If INSERT operation is used for ingestion, for COW table, there is no merging 
of small files by default; for MOR table, the small file strategy is applied 
always: MOR appends delta records to log files.
+For `INSERT` mode write operation, the current work flow is:
 
-The small file strategy lead to performance degradation. If you want to apply 
the behavior of file merge for COW table, turns on option 
`write.insert.cluster`, there is no record key combining by the way.
+- For Merge_On_Read table, the small file strategies are by default applied: 
tries to append to the small avro log files first
+- For Copy_On_Write table, write new parquet files directly, no small file 
strategies are applied
+
+Hudi supports rich clustering strategies to optimize the files layout for 
`INSERT` mode:
+
+#### Inline Clustering
+
+:::note
+Only Copy_On_Write table is supported. 
+:::
 
-#### Options
 |  Option Name  | Required | Default | Remarks |
 |  -----------  | -------  | ------- | ------- |
 | `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
 
-### Rate Limit
-There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. For this case, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
+#### Async Clustering
 
-#### Options
 |  Option Name  | Required | Default | Remarks |
 |  -----------  | -------  | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule 
clustering plan during write process, by default false |
+| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the 
clustering plan, only valid when `clustering.schedule.enabled` is true |
+| `clustering.async.enabled` | `false` | `false` | Whether to execute 
clustering plan asynchronously, by default false |
+| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
+| `clustering.plan.strategy.target.file.max.bytes` | `false` | 
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
+| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file 
that has less size than the threshold (unit MB) are candidates for clustering |
+| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to 
sort by when clustering |
+
+#### Clustering Plan Strategy
 
-### Streaming Query
-By default, the hoodie table is read as batch, that is to read the latest 
snapshot data set and returns. Turns on the streaming read
-mode by setting option `read.streaming.enabled` as `true`. Sets up option 
`read.start-commit` to specify the read start offset, specifies the
-value as `earliest` if you want to consume all the history data set.
+Custom clustering strategy is supported.
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options 
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent 
days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` | 
Valid for `RECENT_DAYS` mode |
+| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
+| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
+| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The 
regex to filter the partitions |
+| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific 
partitions separated by comma `,` |
+
+### Bucket Index
+
+By default, flink uses the state-backend to keep the file index: the mapping 
from primary key to fileId. When the input data set is large,
+there is possibility the cost of the state be a bottleneck, the bucket index 
use deterministic hash algorithm for shuffling the records into
+buckets, thus can avoid the storage and query overhead of indexes.
 
 #### Options
+
 |  Option Name  | Required | Default | Remarks |
 |  -----------  | -------  | ------- | ------- |
-| `read.streaming.enabled` | false | `false` | Specify `true` to read as 
streaming |
-| `read.start-commit` | false | the latest commit | Start commit time in 
format 'yyyyMMddHHmmss', use `earliest` to consume from the start commit |
-| `read.streaming.skip_compaction` | false | `false` | Whether to skip 
compaction commits while reading, generally for two purposes: 1) Avoid 
consuming duplications from the compaction instants 2) When change log mode is 
enabled, to only consume change logs for right semantics. |
-| `clean.retain_commits` | false | `10` | The max number of commits to retain 
before cleaning, when change log mode is enabled, tweaks this option to adjust 
the change log live time. For example, the default strategy keeps 50 minutes of 
change logs if the checkpoint interval is set up as 5 minutes. |
+| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket 
index |
+| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset 
of the primary key |
+| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets 
per-partition, it is immutable once set up |
 
-:::note
-When option `read.streaming.skip_compaction` turns on and the streaming reader 
lags behind by commits of number
-`clean.retain_commits`, the data loss may occur. The compaction keeps the 
original instant time as the per-record metadata,
-the streaming reader would read and skip the whole base files if the log has 
been consumed. For efficiency, option `read.streaming.skip_compaction`
-is till suggested being `true`.
-:::
+Comparing to state index:
+
+- Bucket index has no computing and storage cost of state-backend index, thus 
has better performance
+- Bucket index can not expand the buckets dynamically, the state-backend index 
can expand the buckets dynamically based on current file layout
+- Bucket index can not handle changes among partitions(no limit if the input 
itself is CDC stream), state-backend index has no limit 
 
-### Incremental Query
-There are 3 use cases for incremental query:
-1. Streaming query: specify the start commit with option `read.start-commit`;
-2. Batch query: specify the start commit with option `read.start-commit` and 
end commit with option `read.end-commit`,
-   the interval is a closed one: both start commit and end commit are 
inclusive;
-3. TimeTravel: consume as batch for an instant time, specify the 
`read.end-commit` is enough because the start commit is latest by default.
+### Rate Limit
+There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. For this case, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
 
 #### Options
 |  Option Name  | Required | Default | Remarks |
 |  -----------  | -------  | ------- | ------- |
-| `read.start-commit` | `false` | the latest commit | Specify `earliest` to 
consume from the start commit |
-| `read.end-commit` | `false` | the latest commit | -- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
 
 ## Kafka Connect Sink
 If you want to perform streaming ingestion into Hudi format similar to 
`HoodieDeltaStreamer`, but you don't want to depend on Spark,
diff --git a/website/docs/querying_data.md b/website/docs/querying_data.md
index 98d7e5d8d2..024c84f5df 100644
--- a/website/docs/querying_data.md
+++ b/website/docs/querying_data.md
@@ -128,6 +128,67 @@ when querying the table, a Flink streaming pipeline starts 
and never ends until
 You can specify the start commit with option `read.streaming.start-commit` and 
source monitoring interval with option
 `read.streaming.check-interval`.
 
+### Streaming Query
+By default, the hoodie table is read as batch, that is to read the latest 
snapshot data set and returns. Turns on the streaming read
+mode by setting option `read.streaming.enabled` as `true`. Sets up option 
`read.start-commit` to specify the read start offset, specifies the
+value as `earliest` if you want to consume all the history data set.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `read.streaming.enabled` | false | `false` | Specify `true` to read as 
streaming |
+| `read.start-commit` | false | the latest commit | Start commit time in 
format 'yyyyMMddHHmmss', use `earliest` to consume from the start commit |
+| `read.streaming.skip_compaction` | false | `false` | Whether to skip 
compaction commits while reading, generally for two purposes: 1) Avoid 
consuming duplications from the compaction instants 2) When change log mode is 
enabled, to only consume change logs for right semantics. |
+| `clean.retain_commits` | false | `10` | The max number of commits to retain 
before cleaning, when change log mode is enabled, tweaks this option to adjust 
the change log live time. For example, the default strategy keeps 50 minutes of 
change logs if the checkpoint interval is set up as 5 minutes. |
+
+:::note
+When option `read.streaming.skip_compaction` turns on and the streaming reader 
lags behind by commits of number
+`clean.retain_commits`, the data loss may occur. The compaction keeps the 
original instant time as the per-record metadata,
+the streaming reader would read and skip the whole base files if the log has 
been consumed. For efficiency, option `read.streaming.skip_compaction`
+is till suggested being `true`.
+:::
+
+### Incremental Query
+There are 3 use cases for incremental query:
+1. Streaming query: specify the start commit with option `read.start-commit`;
+2. Batch query: specify the start commit with option `read.start-commit` and 
end commit with option `read.end-commit`,
+   the interval is a closed one: both start commit and end commit are 
inclusive;
+3. TimeTravel: consume as batch for an instant time, specify the 
`read.end-commit` is enough because the start commit is latest by default.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `read.start-commit` | `false` | the latest commit | Specify `earliest` to 
consume from the start commit |
+| `read.end-commit` | `false` | the latest commit | -- |
+
+### Metadata Table
+The metadata table holds the metadata index per hudi table, it holds the file 
list and all kinds of indexes that we called multi-model index.
+Current these indexes are supported:
+
+1. partition -> files mapping
+2. column max/min statistics for each file
+3. bloom filter for each file
+
+The partition -> files mappings can be used for fetching the file list for 
writing/reading path, it is cost friendly for object storage that charges per 
visit,
+for HDFS, it can ease the access burden of the NameNode.
+
+The column max/min statistics per file is used for query acceleration, in the 
writing path, when enable this feature, hudi would
+book-keep the max/min values for each column in real-time, thus would decrease 
the writing throughput. In the reading path, hudi uses
+this statistics to filter out the useless files first before scanning.
+
+The bloom filter index is currently only used for spark bloom filter index, 
not for query acceleration yet.
+
+In general, enable the metadata table would increase the commit time, it is 
not very friendly for the use cases for very short checkpoint interval (say 
30s).
+And for these use cases you should test the stability first.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `metadata.enabled` | `false` | false | Set to `true` to enable |
+| `read.data.skipping.enabled` | `false` | false | Whether to enable data 
skipping for batch snapshot read, by default disabled |
+| `hoodie.metadata.index.column.stats.enable` | `false` | false | Whether to 
enable column statistics (max/min) |
+| `hoodie.metadata.index.column.stats.column.list` | `false` | N/A | 
Columns(separated by comma) to collect the column statistics  |
+
 ## Hive
 To setup Hive for querying Hudi, see the [Query Engine 
Setup](/docs/query_engine_setup#hive) page.
 

Reply via email to