This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new ecdcf4ee61 [HUDI-4535] Add document for flink data skipping (#6299)
ecdcf4ee61 is described below
commit ecdcf4ee61ff267d0739be3d66fc3c4bdb1a0d69
Author: Danny Chan <[email protected]>
AuthorDate: Thu Aug 4 17:44:38 2022 +0800
[HUDI-4535] Add document for flink data skipping (#6299)
---
website/docs/flink-quick-start-guide.md | 2 +-
website/docs/hoodie_deltastreamer.md | 80 ++++++++++++++++++++-------------
website/docs/querying_data.md | 61 +++++++++++++++++++++++++
3 files changed, 112 insertions(+), 31 deletions(-)
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index e9ca0c3df5..4cf2b1042b 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -10,7 +10,7 @@ This guide helps you quickly start using Flink on Hudi, and
learn different mode
- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
- **Configuration** : For [Global
Configuration](flink_configuration#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](flink_configuration#table-options).
- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](hoodie_deltastreamer#cdc-ingestion), [Bulk
Insert](hoodie_deltastreamer#bulk-insert), [Index
Bootstrap](hoodie_deltastreamer#index-bootstrap), [Changelog
Mode](hoodie_deltastreamer#changelog-mode) and [Append
Mode](hoodie_deltastreamer#append-mode).
-- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](hoodie_deltastreamer#streaming-query) and [Incremental
Query](hoodie_deltastreamer#incremental-query).
+- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](querying_data#streaming-query) and [Incremental
Query](querying_data#incremental-query).
- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](flink_configuration#memory-optimization) and
[Write Rate Limit](flink_configuration#write-rate-limit).
- **Optimization**: Offline compaction is supported [Offline
Compaction](compaction#flink-offline-compaction).
- **Query Engines**: Besides Flink, many other engines are integrated: [Hive
Query](syncing_metastore#flink-setup), [Presto
Query](query_engine_setup#prestodb).
diff --git a/website/docs/hoodie_deltastreamer.md
b/website/docs/hoodie_deltastreamer.md
index 607b6a989b..00300a426f 100644
--- a/website/docs/hoodie_deltastreamer.md
+++ b/website/docs/hoodie_deltastreamer.md
@@ -472,56 +472,76 @@ the compaction options:
[`compaction.delta_commits`](#compaction) and [`compacti
### Append Mode
-If INSERT operation is used for ingestion, for COW table, there is no merging
of small files by default; for MOR table, the small file strategy is applied
always: MOR appends delta records to log files.
+For `INSERT` mode write operation, the current work flow is:
-The small file strategy lead to performance degradation. If you want to apply
the behavior of file merge for COW table, turns on option
`write.insert.cluster`, there is no record key combining by the way.
+- For Merge_On_Read table, the small file strategies are by default applied:
tries to append to the small avro log files first
+- For Copy_On_Write table, write new parquet files directly, no small file
strategies are applied
+
+Hudi supports rich clustering strategies to optimize the files layout for
`INSERT` mode:
+
+#### Inline Clustering
+
+:::note
+Only Copy_On_Write table is supported.
+:::
-#### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
-### Rate Limit
-There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. For this case, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
+#### Async Clustering
-#### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule
clustering plan during write process, by default false |
+| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the
clustering plan, only valid when `clustering.schedule.enabled` is true |
+| `clustering.async.enabled` | `false` | `false` | Whether to execute
clustering plan asynchronously, by default false |
+| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
+| `clustering.plan.strategy.target.file.max.bytes` | `false` |
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
+| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file
that has less size than the threshold (unit MB) are candidates for clustering |
+| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to
sort by when clustering |
+
+#### Clustering Plan Strategy
-### Streaming Query
-By default, the hoodie table is read as batch, that is to read the latest
snapshot data set and returns. Turns on the streaming read
-mode by setting option `read.streaming.enabled` as `true`. Sets up option
`read.start-commit` to specify the read start offset, specifies the
-value as `earliest` if you want to consume all the history data set.
+Custom clustering strategy is supported.
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent
days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` |
Valid for `RECENT_DAYS` mode |
+| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
+| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
+| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The
regex to filter the partitions |
+| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific
partitions separated by comma `,` |
+
+### Bucket Index
+
+By default, flink uses the state-backend to keep the file index: the mapping
from primary key to fileId. When the input data set is large,
+there is possibility the cost of the state be a bottleneck, the bucket index
use deterministic hash algorithm for shuffling the records into
+buckets, thus can avoid the storage and query overhead of indexes.
#### Options
+
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
-| `read.streaming.enabled` | false | `false` | Specify `true` to read as
streaming |
-| `read.start-commit` | false | the latest commit | Start commit time in
format 'yyyyMMddHHmmss', use `earliest` to consume from the start commit |
-| `read.streaming.skip_compaction` | false | `false` | Whether to skip
compaction commits while reading, generally for two purposes: 1) Avoid
consuming duplications from the compaction instants 2) When change log mode is
enabled, to only consume change logs for right semantics. |
-| `clean.retain_commits` | false | `10` | The max number of commits to retain
before cleaning, when change log mode is enabled, tweaks this option to adjust
the change log live time. For example, the default strategy keeps 50 minutes of
change logs if the checkpoint interval is set up as 5 minutes. |
+| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket
index |
+| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset
of the primary key |
+| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets
per-partition, it is immutable once set up |
-:::note
-When option `read.streaming.skip_compaction` turns on and the streaming reader
lags behind by commits of number
-`clean.retain_commits`, the data loss may occur. The compaction keeps the
original instant time as the per-record metadata,
-the streaming reader would read and skip the whole base files if the log has
been consumed. For efficiency, option `read.streaming.skip_compaction`
-is till suggested being `true`.
-:::
+Comparing to state index:
+
+- Bucket index has no computing and storage cost of state-backend index, thus
has better performance
+- Bucket index can not expand the buckets dynamically, the state-backend index
can expand the buckets dynamically based on current file layout
+- Bucket index can not handle changes among partitions(no limit if the input
itself is CDC stream), state-backend index has no limit
-### Incremental Query
-There are 3 use cases for incremental query:
-1. Streaming query: specify the start commit with option `read.start-commit`;
-2. Batch query: specify the start commit with option `read.start-commit` and
end commit with option `read.end-commit`,
- the interval is a closed one: both start commit and end commit are
inclusive;
-3. TimeTravel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
+### Rate Limit
+There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. For this case, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
#### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
-| `read.start-commit` | `false` | the latest commit | Specify `earliest` to
consume from the start commit |
-| `read.end-commit` | `false` | the latest commit | -- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
## Kafka Connect Sink
If you want to perform streaming ingestion into Hudi format similar to
`HoodieDeltaStreamer`, but you don't want to depend on Spark,
diff --git a/website/docs/querying_data.md b/website/docs/querying_data.md
index 98d7e5d8d2..024c84f5df 100644
--- a/website/docs/querying_data.md
+++ b/website/docs/querying_data.md
@@ -128,6 +128,67 @@ when querying the table, a Flink streaming pipeline starts
and never ends until
You can specify the start commit with option `read.streaming.start-commit` and
source monitoring interval with option
`read.streaming.check-interval`.
+### Streaming Query
+By default, the hoodie table is read as batch, that is to read the latest
snapshot data set and returns. Turns on the streaming read
+mode by setting option `read.streaming.enabled` as `true`. Sets up option
`read.start-commit` to specify the read start offset, specifies the
+value as `earliest` if you want to consume all the history data set.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `read.streaming.enabled` | false | `false` | Specify `true` to read as
streaming |
+| `read.start-commit` | false | the latest commit | Start commit time in
format 'yyyyMMddHHmmss', use `earliest` to consume from the start commit |
+| `read.streaming.skip_compaction` | false | `false` | Whether to skip
compaction commits while reading, generally for two purposes: 1) Avoid
consuming duplications from the compaction instants 2) When change log mode is
enabled, to only consume change logs for right semantics. |
+| `clean.retain_commits` | false | `10` | The max number of commits to retain
before cleaning, when change log mode is enabled, tweaks this option to adjust
the change log live time. For example, the default strategy keeps 50 minutes of
change logs if the checkpoint interval is set up as 5 minutes. |
+
+:::note
+When option `read.streaming.skip_compaction` turns on and the streaming reader
lags behind by commits of number
+`clean.retain_commits`, the data loss may occur. The compaction keeps the
original instant time as the per-record metadata,
+the streaming reader would read and skip the whole base files if the log has
been consumed. For efficiency, option `read.streaming.skip_compaction`
+is till suggested being `true`.
+:::
+
+### Incremental Query
+There are 3 use cases for incremental query:
+1. Streaming query: specify the start commit with option `read.start-commit`;
+2. Batch query: specify the start commit with option `read.start-commit` and
end commit with option `read.end-commit`,
+ the interval is a closed one: both start commit and end commit are
inclusive;
+3. TimeTravel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `read.start-commit` | `false` | the latest commit | Specify `earliest` to
consume from the start commit |
+| `read.end-commit` | `false` | the latest commit | -- |
+
+### Metadata Table
+The metadata table holds the metadata index per hudi table, it holds the file
list and all kinds of indexes that we called multi-model index.
+Current these indexes are supported:
+
+1. partition -> files mapping
+2. column max/min statistics for each file
+3. bloom filter for each file
+
+The partition -> files mappings can be used for fetching the file list for
writing/reading path, it is cost friendly for object storage that charges per
visit,
+for HDFS, it can ease the access burden of the NameNode.
+
+The column max/min statistics per file is used for query acceleration, in the
writing path, when enable this feature, hudi would
+book-keep the max/min values for each column in real-time, thus would decrease
the writing throughput. In the reading path, hudi uses
+this statistics to filter out the useless files first before scanning.
+
+The bloom filter index is currently only used for spark bloom filter index,
not for query acceleration yet.
+
+In general, enable the metadata table would increase the commit time, it is
not very friendly for the use cases for very short checkpoint interval (say
30s).
+And for these use cases you should test the stability first.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `metadata.enabled` | `false` | false | Set to `true` to enable |
+| `read.data.skipping.enabled` | `false` | false | Whether to enable data
skipping for batch snapshot read, by default disabled |
+| `hoodie.metadata.index.column.stats.enable` | `false` | false | Whether to
enable column statistics (max/min) |
+| `hoodie.metadata.index.column.stats.column.list` | `false` | N/A |
Columns(separated by comma) to collect the column statistics |
+
## Hive
To setup Hive for querying Hudi, see the [Query Engine
Setup](/docs/query_engine_setup#hive) page.