This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 2223b2bce6f [HUDI-7502][DOCS] Re-organize the content in "how to"
section (#10862)
2223b2bce6f is described below
commit 2223b2bce6fd2fb39a41b795fa5729c3406ef65d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Mar 13 21:47:16 2024 -0500
[HUDI-7502][DOCS] Re-organize the content in "how to" section (#10862)
---
website/docs/concurrency_control.md | 2 +-
website/docs/deployment.md | 2 +-
website/docs/faq.md | 2 +-
website/docs/faq_general.md | 4 +-
...aq_querying_tables.md => faq_reading_tables.md} | 4 +-
website/docs/faq_table_services.md | 2 +-
website/docs/faq_writing_tables.md | 4 +-
website/docs/flink-quick-start-guide.md | 4 +-
website/docs/hoodie_streaming_ingestion.md | 369 +--------------------
website/docs/hudi_stack.md | 2 +-
website/docs/ingestion_flink.md | 179 ++++++++++
website/docs/ingestion_kafka_connect.md | 7 +
website/docs/overview.md | 2 +-
.../docs/platform_services_post_commit_callback.md | 57 ++++
website/docs/precommit_validator.md | 2 +-
website/docs/quick-start-guide.md | 8 +-
website/docs/reading_tables_batch_reads.md | 20 ++
website/docs/reading_tables_streaming_reads.md | 99 ++++++
website/docs/sql_ddl.md | 4 +-
website/docs/sql_dml.md | 116 ++++++-
website/docs/write_operations.md | 5 +-
website/docs/writing_data.md | 201 +----------
website/docs/writing_tables_streaming_writes.md | 92 +++++
website/releases/release-1.0.0-beta1.md | 2 +-
website/sidebars.js | 64 ++--
website/versioned_docs/version-0.14.1/faq.md | 2 +-
26 files changed, 645 insertions(+), 610 deletions(-)
diff --git a/website/docs/concurrency_control.md
b/website/docs/concurrency_control.md
index dd4e217829e..461f2d1dd3c 100644
--- a/website/docs/concurrency_control.md
+++ b/website/docs/concurrency_control.md
@@ -77,7 +77,7 @@ Multiple writers can operate on the table with non-blocking
conflict resolution.
file group with the conflicts resolved automatically by the query reader and
the compactor. The new concurrency mode is
currently available for preview in version 1.0.0-beta only with the caveat
that conflict resolution is not supported yet
between clustering and ingestion. It works for compaction and ingestion, and
we can see an example of that with Flink
-writers
[here](/docs/next/writing_data#non-blocking-concurrency-control-experimental).
+writers
[here](/docs/next/sql_dml#non-blocking-concurrency-control-experimental).
## Enabling Multi Writing
diff --git a/website/docs/deployment.md b/website/docs/deployment.md
index f5ad89c7f81..b6d56b9937e 100644
--- a/website/docs/deployment.md
+++ b/website/docs/deployment.md
@@ -136,7 +136,7 @@ Here is an example invocation for reading from kafka topic
in a continuous mode
### Spark Datasource Writer Jobs
-As described in [Writing Data](/docs/writing_data#spark-datasource-writer),
you can use spark datasource to ingest to hudi table. This mechanism allows you
to ingest any spark dataframe in Hudi format. Hudi Spark DataSource also
supports spark streaming to ingest a streaming source to Hudi table. For Merge
On Read table types, inline compaction is turned on by default which runs after
every ingestion run. The compaction frequency can be changed by setting the
property "hoodie.compact.inl [...]
+As described in [Batch Writes](/docs/next/writing_data#spark-datasource-api),
you can use spark datasource to ingest to hudi table. This mechanism allows you
to ingest any spark dataframe in Hudi format. Hudi Spark DataSource also
supports spark streaming to ingest a streaming source to Hudi table. For Merge
On Read table types, inline compaction is turned on by default which runs after
every ingestion run. The compaction frequency can be changed by setting the
property "hoodie.compact.i [...]
Here is an example invocation using spark datasource
diff --git a/website/docs/faq.md b/website/docs/faq.md
index e212008f37c..1378839b81c 100644
--- a/website/docs/faq.md
+++ b/website/docs/faq.md
@@ -9,7 +9,7 @@ The FAQs are split into following pages. Please refer to the
specific pages for
- [General](/docs/next/faq_general)
- [Design & Concepts](/docs/next/faq_design_and_concepts)
- [Writing Tables](/docs/next/faq_writing_tables)
-- [Querying Tables](/docs/next/faq_querying_tables)
+- [Reading Tables](/docs/next/faq_reading_tables)
- [Table Services](/docs/next/faq_table_services)
- [Storage](/docs/next/faq_storage)
- [Integrations](/docs/next/faq_integrations)
diff --git a/website/docs/faq_general.md b/website/docs/faq_general.md
index 2621b84e3a2..2682d17e950 100644
--- a/website/docs/faq_general.md
+++ b/website/docs/faq_general.md
@@ -6,7 +6,7 @@ keywords: [hudi, writing, reading]
### When is Hudi useful for me or my organization?
-If you are looking to quickly ingest data onto HDFS or cloud storage, Hudi can
provide you tools to [help](/docs/writing_data/). Also, if you have
ETL/hive/spark jobs which are slow/taking up a lot of resources, Hudi can
potentially help by providing an incremental approach to reading and writing
data.
+If you are looking to quickly ingest data onto HDFS or cloud storage, Hudi
provides you [tools](/docs/hoodie_streaming_ingestion). Also, if you have
ETL/hive/spark jobs which are slow/taking up a lot of resources, Hudi can
potentially help by providing an incremental approach to reading and writing
data.
As an organization, Hudi can help you build an [efficient data
lake](https://docs.google.com/presentation/d/1FHhsvh70ZP6xXlHdVsAI0g__B_6Mpto5KQFlZ0b8-mM/edit#slide=id.p),
solving some of the most complex, low-level storage management problems, while
putting data into hands of your data analysts, engineers and scientists much
quicker.
@@ -61,7 +61,7 @@ Nonetheless, Hudi is designed very much like a database and
provides similar fun
### How do I model the data stored in Hudi?
-When writing data into Hudi, you model the records like how you would on a
key-value store - specify a key field (unique for a single partition/across
table), a partition field (denotes partition to place key into) and
preCombine/combine logic that specifies how to handle duplicates in a batch of
records written. This model enables Hudi to enforce primary key constraints
like you would get on a database table. See [here](/docs/writing_data/) for an
example.
+When writing data into Hudi, you model the records like how you would on a
key-value store - specify a key field (unique for a single partition/across
table), a partition field (denotes partition to place key into) and
preCombine/combine logic that specifies how to handle duplicates in a batch of
records written. This model enables Hudi to enforce primary key constraints
like you would get on a database table. See [here](/docs/next/writing_data) for
an example.
When querying/reading data, Hudi just presents itself as a json-like
hierarchical table, everyone is used to querying using Hive/Spark/Presto over
Parquet/Json/Avro.
diff --git a/website/docs/faq_querying_tables.md
b/website/docs/faq_reading_tables.md
similarity index 98%
rename from website/docs/faq_querying_tables.md
rename to website/docs/faq_reading_tables.md
index 9b2f8484c09..207d90c487b 100644
--- a/website/docs/faq_querying_tables.md
+++ b/website/docs/faq_reading_tables.md
@@ -1,8 +1,8 @@
---
-title: Querying Tables
+title: Reading Tables
keywords: [hudi, writing, reading]
---
-# Querying Tables
+# Reading Tables FAQ
### Does deleted records appear in Hudi's incremental query results?
diff --git a/website/docs/faq_table_services.md
b/website/docs/faq_table_services.md
index 59dc4e71ddd..0ca730094e4 100644
--- a/website/docs/faq_table_services.md
+++ b/website/docs/faq_table_services.md
@@ -50,6 +50,6 @@ Hudi runs cleaner to remove old file versions as part of
writing data either in
Yes. Hudi provides the ability to post a callback notification about a write
commit. You can use a http hook or choose to
-be notified via a Kafka/pulsar topic or plug in your own implementation to get
notified. Please refer [here](writing_data/#commit-notifications)
+be notified via a Kafka/pulsar topic or plug in your own implementation to get
notified. Please refer [here](/docs/next/platform_services_post_commit_callback)
for details
diff --git a/website/docs/faq_writing_tables.md
b/website/docs/faq_writing_tables.md
index 5eb7b2cbbc4..bb1c1a01f74 100644
--- a/website/docs/faq_writing_tables.md
+++ b/website/docs/faq_writing_tables.md
@@ -6,7 +6,7 @@ keywords: [hudi, writing, reading]
### What are some ways to write a Hudi table?
-Typically, you obtain a set of partial updates/inserts from your source and
issue [write operations](/docs/write_operations/) against a Hudi table. If you
ingesting data from any of the standard sources like Kafka, or tailing DFS, the
[delta streamer](/docs/hoodie_streaming_ingestion#deltastreamer) tool is
invaluable and provides an easy, self-managed solution to getting data written
into Hudi. You can also write your own code to capture data from a custom
source using the Spark datasour [...]
+Typically, you obtain a set of partial updates/inserts from your source and
issue [write operations](/docs/write_operations/) against a Hudi table. If you
ingesting data from any of the standard sources like Kafka, or tailing DFS, the
[delta streamer](/docs/hoodie_streaming_ingestion#hudi-streamer) tool is
invaluable and provides an easy, self-managed solution to getting data written
into Hudi. You can also write your own code to capture data from a custom
source using the Spark datasour [...]
### How is a Hudi writer job deployed?
@@ -68,7 +68,7 @@ As you could see, ([combineAndGetUpdateValue(),
getInsertValue()](https://github
### How do I delete records in the dataset using Hudi?
-GDPR has made deletes a must-have tool in everyone's data management toolbox.
Hudi supports both soft and hard deletes. For details on how to actually
perform them, see [here](/docs/writing_data/#deletes).
+GDPR has made deletes a must-have tool in everyone's data management toolbox.
Hudi supports both soft and hard deletes. For details on how to actually
perform them, see [here](/docs/next/writing_data#deletes).
### Should I need to worry about deleting all copies of the records in case of
duplicates?
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index 89e60737859..bdca36c6853 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -448,8 +448,8 @@ feature is that it now lets you author streaming pipelines
on streaming or batch
## Where To Go From Here?
- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
- **Configuration** : For [Global
Configuration](/docs/next/flink_tuning#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](/docs/next/flink_tuning#table-options).
-- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append
Mode](/docs/hoodie_streaming_ingestion#append-mode). Flink also supports
multiple streaming writers with [non-blocking concurrency control](/docs/ [...]
-- **Querying Data** : Flink supports different modes for reading, such as
[Streaming Query](/docs/querying_data#streaming-query) and [Incremental
Query](/docs/querying_data#incremental-query).
+- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](/docs/next/ingestion_flink#cdc-ingestion), [Bulk
Insert](/docs/next/ingestion_flink#bulk-insert), [Index
Bootstrap](/docs/next/ingestion_flink#index-bootstrap), [Changelog
Mode](/docs/next/ingestion_flink#changelog-mode) and [Append
Mode](/docs/next/ingestion_flink#append-mode). Flink also supports multiple
streaming writers with [non-blocking concurrency
control](/docs/next/sql_dml#non-blocking-conc [...]
+- **Reading Data** : Flink supports different modes for reading, such as
[Streaming Query](/docs/sql_queries#streaming-query) and [Incremental
Query](/docs/sql_queries#incremental-query).
- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](/docs/next/flink_tuning#memory-optimization) and
[Write Rate Limit](/docs/next/flink_tuning#write-rate-limit).
- **Optimization**: Offline compaction is supported [Offline
Compaction](/docs/compaction#flink-offline-compaction).
- **Query Engines**: Besides Flink, many other engines are integrated: [Hive
Query](/docs/syncing_metastore#flink-setup), [Presto
Query](/docs/querying_data#prestodb).
diff --git a/website/docs/hoodie_streaming_ingestion.md
b/website/docs/hoodie_streaming_ingestion.md
index 6bc2f288e4b..8a8323c3bf0 100644
--- a/website/docs/hoodie_streaming_ingestion.md
+++ b/website/docs/hoodie_streaming_ingestion.md
@@ -1,5 +1,5 @@
---
-title: Streaming Ingestion
+title: Using Spark
keywords: [hudi, streamer, hoodiestreamer, spark_streaming]
---
import Tabs from '@theme/Tabs';
@@ -571,370 +571,3 @@ to how you run `HoodieStreamer`.
```
For detailed information on how to configure and use
`HoodieMultiTableStreamer`, please refer [blog
section](/blog/2020/08/22/ingest-multiple-tables-using-hudi).
-
-## Structured Streaming
-
-Hudi supports Spark Structured Streaming reads and writes.
-
-### Streaming Write
-You can write Hudi tables using spark's structured streaming.
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// prepare to stream write to new table
-import org.apache.spark.sql.streaming.Trigger
-
-val streamingTableName = "hudi_trips_cow_streaming"
-val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
-
-// create streaming df
-val df = spark.readStream.
- format("hudi").
- load(basePath)
-
-// write stream to new hudi table
-df.writeStream.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, streamingTableName).
- outputMode("append").
- option("path", baseStreamingPath).
- option("checkpointLocation", checkpointLocation).
- trigger(Trigger.Once()).
- start()
-
-```
-
-</TabItem>
-<TabItem value="python">
-
-```python
-# pyspark
-# prepare to stream write to new table
-streamingTableName = "hudi_trips_cow_streaming"
-baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
-
-hudi_streaming_options = {
- 'hoodie.table.name': streamingTableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': streamingTableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
-}
-
-# create streaming df
-df = spark.readStream \
- .format("hudi") \
- .load(basePath)
-
-# write stream to new hudi table
-df.writeStream.format("hudi") \
- .options(**hudi_streaming_options) \
- .outputMode("append") \
- .option("path", baseStreamingPath) \
- .option("checkpointLocation", checkpointLocation) \
- .trigger(once=True) \
- .start()
-
-```
-
-</TabItem>
-
-</Tabs
->
-
-### Streaming Read
-
-Structured Streaming reads are based on Hudi's Incremental Query feature,
therefore streaming read can return data for which
-commits and base files were not yet removed by the cleaner. You can control
commits retention time.
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// reload data
-df.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Overwrite).
- save(basePath)
-
-// read stream and output results to console
-spark.readStream.
- format("hudi").
- load(basePath).
- writeStream.
- format("console").
- start()
-
-// read stream to streaming df
-val df = spark.readStream.
- format("hudi").
- load(basePath)
-
-```
-</TabItem>
-
-<TabItem value="python">
-
-```python
-# pyspark
-# reload data
-inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
- dataGen.generateInserts(10))
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
-
-hudi_options = {
- 'hoodie.table.name': tableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
-}
-
-df.write.format("hudi"). \
- options(**hudi_options). \
- mode("overwrite"). \
- save(basePath)
-
-# read stream to streaming df
-df = spark.readStream \
- .format("hudi") \
- .load(basePath)
-
-# ead stream and output results to console
-spark.readStream \
- .format("hudi") \
- .load(basePath) \
- .writeStream \
- .format("console") \
- .start()
-
-```
-</TabItem>
-
-</Tabs
->
-
-
-:::info
-Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE
and MERGE INTO.
-Target table must exist before write.
-:::
-
-
-## Flink Ingestion
-
-### CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
-We recommend two ways for syncing CDC data into Hudi:
-
-
-
-1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
- The advantage is that it does not rely on message queues, but the
disadvantage is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
- but the disadvantage is that it relies on message queues.
-
-:::note
-- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
-:::
-
-### Bulk Insert
-
-For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
-import the snapshot data into Hudi.
-
-
-:::note
-`bulk_insert` eliminates the serialization and data merging. The data
deduplication is skipped, so the user need to guarantee the uniqueness of the
data.
-:::
-
-:::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write
performance degradation caused by
-frequent `file handle` switching.
-:::
-
-:::note
-The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >=
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks).
-:::
-
-#### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open
this function |
-| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the
number of files` >=
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks) |
-| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle
data according to the input field before writing. Enabling this option will
reduce the number of small files, but there may be a risk of data skew |
-| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort data
according to the input field before writing. Enabling this option will reduce
the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort
operator. default `128` MB |
-
-### Index Bootstrap
-
-For the demand of `snapshot data` + `incremental data` import. If the
`snapshot data` already insert into Hudi by [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not
repeated by using the index bootstrap function.
-
-:::note
-If you think this process is very time-consuming, you can add resources to
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate
limit function).
-:::
-
-#### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remain records in Hudi table will be loaded into the Flink state
at one time |
-| `index.partition.regex` | `false` | `*` | Optimize option. Setting
regular expressions to filter partitions. By default, all partitions are loaded
into flink state |
-
-#### How To Use
-
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` :
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
-
-:::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish
loading the index under partition` and `Load record form file` to observe the
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap
completed. There is no need to load the index again when recovering from the
checkpoint.
-:::
-
-### Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores
messages in the forms of rows, which supports the retention of all change logs
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
-
-#### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
-
-:::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
-:::
-
-:::note
-After setting `changelog.enable` as `true`, the retention of changelog records
are only best effort: the asynchronous compaction task will merge the changelog
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can
be read after compaction. The solution is to reserve some buffer time for the
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and
[`compaction.delta_seconds`](#compaction).
-:::
-
-
-### Append Mode
-
-For `INSERT` mode write operation, the current work flow is:
-
-- For Merge_On_Read table, the small file strategies are by default applied:
tries to append to the small avro log files first
-- For Copy_On_Write table, write new parquet files directly, no small file
strategies are applied
-
-Hudi supports rich clustering strategies to optimize the files layout for
`INSERT` mode:
-
-#### Inline Clustering
-
-:::note
-Only Copy_On_Write table is supported.
-:::
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
-
-#### Async Clustering
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule
clustering plan during write process, by default false |
-| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the
clustering plan, only valid when `clustering.schedule.enabled` is true |
-| `clustering.async.enabled` | `false` | `false` | Whether to execute
clustering plan asynchronously, by default false |
-| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
-| `clustering.plan.strategy.target.file.max.bytes` | `false` |
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
-| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file
that has less size than the threshold (unit MB) are candidates for clustering |
-| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to
sort by when clustering |
-
-#### Clustering Plan Strategy
-
-Custom clustering strategy is supported.
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent
days; 3) `SELECTED_PARTITIONS`: specific partitions |
-| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` |
Valid for `RECENT_DAYS` mode |
-| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
-| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
-| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The
regex to filter the partitions |
-| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific
partitions separated by comma `,` |
-
-### Bucket Index
-
-By default, flink uses the state-backend to keep the file index: the mapping
from primary key to fileId. When the input data set is large,
-there is possibility the cost of the state be a bottleneck, the bucket index
use deterministic hash algorithm for shuffling the records into
-buckets, thus can avoid the storage and query overhead of indexes.
-
-#### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket
index |
-| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset
of the primary key |
-| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets
per-partition, it is immutable once set up |
-
-Comparing to state index:
-
-- Bucket index has no computing and storage cost of state-backend index, thus
has better performance
-- Bucket index can not expand the buckets dynamically, the state-backend index
can expand the buckets dynamically based on current file layout
-- Bucket index can not handle changes among partitions(no limit if the input
itself is CDC stream), state-backend index has no limit
-
-### Rate Limit
-There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. For this case, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
-
-#### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
-
-## Kafka Connect Sink
-If you want to perform streaming ingestion into Hudi format similar to
`HoodieStreamer`, but you don't want to depend on Spark,
-try out the new experimental release of Hudi Kafka Connect Sink. Read the
[ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect)
-for full documentation.
-
diff --git a/website/docs/hudi_stack.md b/website/docs/hudi_stack.md
index 36a1b7a4a4c..a3a7896c92e 100644
--- a/website/docs/hudi_stack.md
+++ b/website/docs/hudi_stack.md
@@ -93,7 +93,7 @@ Hudi provides snapshot isolation for writers and readers,
enabling consistent ta

<p align = "center">Figure: Various platform services in Hudi</p>
-Platform services offer functionality that is specific to data and workloads,
and they sit directly on top of the table services, interfacing with writers
and readers. Services, like [Hudi
Streamer](./hoodie_streaming_ingestion#hudi-streamer), are specialized in
handling data and workloads, seamlessly integrating with Kafka streams and
various formats to build data lakes. They support functionalities like
automatic checkpoint management, integration with major schema registries
(includin [...]
+Platform services offer functionality that is specific to data and workloads,
and they sit directly on top of the table services, interfacing with writers
and readers. Services, like [Hudi
Streamer](./hoodie_streaming_ingestion#hudi-streamer), are specialized in
handling data and workloads, seamlessly integrating with Kafka streams and
various formats to build data lakes. They support functionalities like
automatic checkpoint management, integration with major schema registries
(includin [...]
### Query Engines
Apache Hudi is compatible with a wide array of query engines, catering to
various analytical needs. For distributed ETL batch processing, Apache Spark is
frequently utilized, leveraging its efficient handling of large-scale data. In
the realm of streaming use cases, compute engines such as Apache Flink and
Apache Spark's Structured Streaming provide robust support when paired with
Hudi. Moreover, Hudi supports modern data lake query engines such as Trino and
Presto, as well as modern ana [...]
\ No newline at end of file
diff --git a/website/docs/ingestion_flink.md b/website/docs/ingestion_flink.md
new file mode 100644
index 00000000000..e9410021374
--- /dev/null
+++ b/website/docs/ingestion_flink.md
@@ -0,0 +1,179 @@
+---
+title: Using Flink
+keywords: [hudi, flink, streamer, ingestion]
+---
+
+### CDC Ingestion
+CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
+We recommend two ways for syncing CDC data into Hudi:
+
+
+
+1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
+ The advantage is that it does not rely on message queues, but the
disadvantage is that it puts pressure on the db server;
+2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
+ but the disadvantage is that it relies on message queues.
+
+:::note
+- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
+:::
+
+### Bulk Insert
+
+For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
+import the snapshot data into Hudi.
+
+
+:::note
+`bulk_insert` eliminates the serialization and data merging. The data
deduplication is skipped, so the user need to guarantee the uniqueness of the
data.
+:::
+
+:::note
+`bulk_insert` is more efficient in the `batch execution mode`. By default, the
`batch execution mode` sorts the input records
+by the partition path and writes these records to Hudi, which can avoid write
performance degradation caused by
+frequent `file handle` switching.
+:::
+
+:::note
+The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism will affect the number of small files.
+In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In
particular, when each bucket writes to maximum file size, it
+will rollover to the new file handle. Finally, `the number of files` >=
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks).
+:::
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open
this function |
+| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the
number of files` >=
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks) |
+| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle
data according to the input field before writing. Enabling this option will
reduce the number of small files, but there may be a risk of data skew |
+| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort data
according to the input field before writing. Enabling this option will reduce
the number of small files when a write task writes multiple partitions |
+| `write.sort.memory` | `false` | `128` | Available managed memory of sort
operator. default `128` MB |
+
+### Index Bootstrap
+
+For the demand of `snapshot data` + `incremental data` import. If the
`snapshot data` already insert into Hudi by [bulk insert](#bulk-insert).
+User can insert `incremental data` in real time and ensure the data is not
repeated by using the index bootstrap function.
+
+:::note
+If you think this process is very time-consuming, you can add resources to
write in streaming mode while writing `snapshot data`,
+and then reduce the resources to write `incremental data` (or open the rate
limit function).
+:::
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remain records in Hudi table will be loaded into the Flink state
at one time |
+| `index.partition.regex` | `false` | `*` | Optimize option. Setting
regular expressions to filter partitions. By default, all partitions are loaded
into flink state |
+
+#### How To Use
+
+1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note
that the `table.type` must be correct.
+2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
+3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` :
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
+4. Waiting until the first checkpoint succeeds, indicating that the index
bootstrap completed.
+5. After the index bootstrap completed, user can exit and save the savepoint
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` as `false`.
+
+:::note
+1. Index bootstrap is blocking, so checkpoint cannot be completed during index
bootstrap.
+2. Index bootstrap triggers by the input data. User need to ensure that there
is at least one record in each partition.
+3. Index bootstrap executes concurrently. User can search in log by `finish
loading the index under partition` and `Load record form file` to observe the
progress of index bootstrap.
+4. The first successful checkpoint indicates that the index bootstrap
completed. There is no need to load the index again when recovering from the
checkpoint.
+:::
+
+### Changelog Mode
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consumes through stateful computing of flink to have a near-real-time
+data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores
messages in the forms of rows, which supports the retention of all change logs
(Integration at the format level).
+All changelog records can be consumed with Flink streaming reader.
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
+
+:::note
+Batch (Snapshot) read still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
+:::
+
+:::note
+After setting `changelog.enable` as `true`, the retention of changelog records
are only best effort: the asynchronous compaction task will merge the changelog
records into one record, so if the
+stream source does not consume timely, only the merged record for each key can
be read after compaction. The solution is to reserve some buffer time for the
reader by adjusting the compaction strategy, such as
+the compaction options: [`compaction.delta_commits`](#compaction) and
[`compaction.delta_seconds`](#compaction).
+:::
+
+
+### Append Mode
+
+For `INSERT` mode write operation, the current work flow is:
+
+- For Merge_On_Read table, the small file strategies are by default applied:
tries to append to the small avro log files first
+- For Copy_On_Write table, write new parquet files directly, no small file
strategies are applied
+
+Hudi supports rich clustering strategies to optimize the files layout for
`INSERT` mode:
+
+#### Inline Clustering
+
+:::note
+Only Copy_On_Write table is supported.
+:::
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
+
+#### Async Clustering
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule
clustering plan during write process, by default false |
+| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the
clustering plan, only valid when `clustering.schedule.enabled` is true |
+| `clustering.async.enabled` | `false` | `false` | Whether to execute
clustering plan asynchronously, by default false |
+| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
+| `clustering.plan.strategy.target.file.max.bytes` | `false` |
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
+| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file
that has less size than the threshold (unit MB) are candidates for clustering |
+| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to
sort by when clustering |
+
+#### Clustering Plan Strategy
+
+Custom clustering strategy is supported.
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent
days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` |
Valid for `RECENT_DAYS` mode |
+| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
+| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
+| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The
regex to filter the partitions |
+| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific
partitions separated by comma `,` |
+
+### Bucket Index
+
+By default, flink uses the state-backend to keep the file index: the mapping
from primary key to fileId. When the input data set is large,
+there is possibility the cost of the state be a bottleneck, the bucket index
use deterministic hash algorithm for shuffling the records into
+buckets, thus can avoid the storage and query overhead of indexes.
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket
index |
+| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset
of the primary key |
+| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets
per-partition, it is immutable once set up |
+
+Comparing to state index:
+
+- Bucket index has no computing and storage cost of state-backend index, thus
has better performance
+- Bucket index can not expand the buckets dynamically, the state-backend index
can expand the buckets dynamically based on current file layout
+- Bucket index can not handle changes among partitions(no limit if the input
itself is CDC stream), state-backend index has no limit
+
+### Rate Limit
+There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. For this case, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
diff --git a/website/docs/ingestion_kafka_connect.md
b/website/docs/ingestion_kafka_connect.md
new file mode 100644
index 00000000000..8f020666ae9
--- /dev/null
+++ b/website/docs/ingestion_kafka_connect.md
@@ -0,0 +1,7 @@
+---
+title: Using Kafka Connect
+keywords: [hudi, kafka, connector, ingestion]
+---
+
+Try out the new experimental release of Hudi Kafka Connect Sink. Read
+the [ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect)
for full documentation.
diff --git a/website/docs/overview.md b/website/docs/overview.md
index 88195f565b4..eb92cc34970 100644
--- a/website/docs/overview.md
+++ b/website/docs/overview.md
@@ -13,7 +13,7 @@ how to learn more to get started.
Apache Hudi (pronounced “hoodie”) is the next generation [streaming data lake
platform](/blog/2021/07/21/streaming-data-lake-platform).
Apache Hudi brings core warehouse and database functionality directly to a
data lake. Hudi provides [tables](/docs/next/sql_ddl),
[transactions](/docs/next/timeline), [efficient
upserts/deletes](/docs/next/write_operations), [advanced
indexes](/docs/next/indexing),
-[streaming ingestion services](/docs/next/hoodie_streaming_ingestion), data
[clustering](/docs/next/clustering)/[compaction](/docs/next/compaction)
optimizations,
+[ingestion services](/docs/hoodie_streaming_ingestion), data
[clustering](/docs/next/clustering)/[compaction](/docs/next/compaction)
optimizations,
and [concurrency](/docs/next/concurrency_control) all while keeping your data
in open source file formats.
Not only is Apache Hudi great for streaming workloads, but it also allows you
to create efficient incremental batch pipelines.
diff --git a/website/docs/platform_services_post_commit_callback.md
b/website/docs/platform_services_post_commit_callback.md
new file mode 100644
index 00000000000..0df43a2a017
--- /dev/null
+++ b/website/docs/platform_services_post_commit_callback.md
@@ -0,0 +1,57 @@
+---
+title: Post-commit Callback
+keywords: [hudi, platform, commit, callback]
+---
+
+Apache Hudi provides the ability to post a callback notification about a write
commit. This may be valuable if you need
+an event notification stream to take actions with other services after a Hudi
write commit.
+You can push a write commit callback notification into HTTP endpoints or to a
Kafka server.
+
+## HTTP Endpoints
+You can push a commit notification to an HTTP URL and can specify custom
values by extending a callback class defined below.
+
+| Config | Description | Required | Default |
+| ----------- | ------- | ------- | ------ |
+| TURN_CALLBACK_ON | Turn commit callback on/off | optional | false
(*callbacks off*) |
+| CALLBACK_HTTP_URL | Callback host to be sent along with callback messages |
required | N/A |
+| CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional |
3 |
+| CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of
HoodieWriteCommitCallback class,
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default |
optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
+| CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional |
hudi_write_commit_http_callback |
+| | | | |
+
+## Kafka Endpoints
+You can push a commit notification to a Kafka topic so it can be used by other
real time systems.
+
+| Config | Description | Required | Default |
+| ----------- | ------- | ------- | ------ |
+| TOPIC | Kafka topic name to publish timeline activity into. | required | N/A
|
+| PARTITION | It may be desirable to serialize all changes into a single Kafka
partition for providing strict ordering. By default, Kafka messages are keyed
by table name, which guarantees ordering at the table level, but not globally
(or when new partitions are added) | required | N/A |
+| RETRIES | Times to retry the produce | optional | 3 |
+| ACKS | kafka acks level, all by default to ensure strong durability |
optional | all |
+| BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for
publishing commit metadata | required | N/A |
+| | | | |
+
+## Pulsar Endpoints
+You can push a commit notification to a Pulsar topic so it can be used by
other real time systems.
+
+| Config | Description
| Required | Default |
+| -----------
|-----------------------------------------------------------------------------|
------- |--------|
+| hoodie.write.commit.callback.pulsar.broker.service.url | Server's Url of
pulsar cluster to use to publish commit metadata. | required | N/A |
+| hoodie.write.commit.callback.pulsar.topic | Pulsar topic name to publish
timeline activity into | required | N/A |
+| hoodie.write.commit.callback.pulsar.producer.route-mode | Message routing
logic for producers on partitioned topics. | optional | RoundRobinPartition
|
+| hoodie.write.commit.callback.pulsar.producer.pending-queue-size | The
maximum size of a queue holding pending messages. | optional |
1000 |
+| hoodie.write.commit.callback.pulsar.producer.pending-total-size | The
maximum number of pending messages across partitions. | required | 50000 |
+| hoodie.write.commit.callback.pulsar.producer.block-if-queue-full | When the
queue is full, the method is blocked instead of an exception is thrown. |
optional | true |
+| hoodie.write.commit.callback.pulsar.producer.send-timeout | The timeout in
each sending to pulsar. | optional | 30s |
+| hoodie.write.commit.callback.pulsar.operation-timeout | Duration of waiting
for completing an operation. | optional | 30s |
+| hoodie.write.commit.callback.pulsar.connection-timeout | Duration of waiting
for a connection to a broker to be established. | optional | 10s |
+| hoodie.write.commit.callback.pulsar.request-timeout | Duration of waiting
for completing a request. | optional | 60s |
+| hoodie.write.commit.callback.pulsar.keepalive-interval | Duration of keeping
alive interval for each client broker connection. | optional | 30s |
+| |
| | |
+
+## Bring your own implementation
+You can extend the HoodieWriteCommitCallback class to implement your own way
to asynchronously handle the callback
+of a successful write. Use this public API:
+
+https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
+
diff --git a/website/docs/precommit_validator.md
b/website/docs/precommit_validator.md
index 741b9deb683..6f6806a3fa9 100644
--- a/website/docs/precommit_validator.md
+++ b/website/docs/precommit_validator.md
@@ -91,7 +91,7 @@ void validateRecordsBeforeAndAfter(Dataset<Row> before,
```
## Additional Monitoring with Notifications
-Hudi offers a [commit notification
service](https://hudi.apache.org/docs/next/writing_data/#commit-notifications)
that can be configured to trigger notifications about write commits.
+Hudi offers a [commit notification
service](/docs/next/platform_services_post_commit_callback) that can be
configured to trigger notifications about write commits.
The commit notification service can be combined with pre-commit validators to
send a notification when a commit fails a validation. This is possible by
passing details about the validation as a custom value to the HTTP endpoint.
diff --git a/website/docs/quick-start-guide.md
b/website/docs/quick-start-guide.md
index 255524853c3..9104edee8a7 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -539,7 +539,7 @@ MERGE statement either using `SET *` or using `SET column1
= expression1 [, colu
## Delete data {#deletes}
Delete operation removes the records specified from the table. For example,
this code snippet deletes records
-for the HoodieKeys passed in. Check out the [deletion
section](/docs/next/writing_data#deletes) for more details.
+for the HoodieKeys passed in. Check out the [deletion
section](/docs/writing_data#deletes) for more details.
<Tabs
groupId="programming-language"
@@ -1119,13 +1119,13 @@ For alter table commands, check out
[this](/docs/next/sql_ddl#spark-alter-table)
Hudi provides industry-leading performance and functionality for streaming
data.
-**Hudi Streamer** - Hudi provides an incremental ingestion/ETL tool -
[HoodieStreamer](/docs/next/hoodie_streaming_ingestion#hudi-streamer), to
assist with ingesting data into Hudi
+**Hudi Streamer** - Hudi provides an incremental ingestion/ETL tool -
[HoodieStreamer](/docs/hoodie_streaming_ingestion#hudi-streamer), to assist
with ingesting data into Hudi
from various different sources in a streaming manner, with powerful built-in
capabilities like auto checkpointing, schema enforcement via schema provider,
transformation support, automatic table services and so on.
-**Structured Streaming** - Hudi supports Spark Structured Streaming reads and
writes as well. Please see
[here](/docs/next/hoodie_streaming_ingestion#structured-streaming) for more.
+**Structured Streaming** - Hudi supports Spark Structured Streaming reads and
writes as well. Please see
[here](/docs/next/writing_tables_streaming_writes#spark-streaming) for more.
-Check out more information on [modeling data in
Hudi](/docs/next/faq_general#how-do-i-model-the-data-stored-in-hudi) and
different ways to [writing Hudi Tables](/docs/next/writing_data).
+Check out more information on [modeling data in
Hudi](/docs/next/faq_general#how-do-i-model-the-data-stored-in-hudi) and
different ways to perform [batch writes](/docs/writing_data) and [streaming
writes](/docs/next/writing_tables_streaming_writes).
### Dockerized Demo
Even as we showcased the core capabilities, Hudi supports a lot more advanced
functionality that can make it easy
diff --git a/website/docs/reading_tables_batch_reads.md
b/website/docs/reading_tables_batch_reads.md
new file mode 100644
index 00000000000..2055a9685df
--- /dev/null
+++ b/website/docs/reading_tables_batch_reads.md
@@ -0,0 +1,20 @@
+---
+title: Batch Reads
+keywords: [hudi, spark, flink, batch, processing]
+---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Spark DataSource API
+
+The `hudi-spark` module offers the DataSource API to read a Hudi table into a
Spark DataFrame.
+
+A time-travel query example:
+
+```Scala
+val tripsDF = spark.read.
+ option("as.of.instant", "2021-07-28 14:11:08.000").
+ format("hudi").
+ load(basePath)
+tripsDF.where(tripsDF.fare > 20.0).show()
+```
diff --git a/website/docs/reading_tables_streaming_reads.md
b/website/docs/reading_tables_streaming_reads.md
new file mode 100644
index 00000000000..57c3e2c4702
--- /dev/null
+++ b/website/docs/reading_tables_streaming_reads.md
@@ -0,0 +1,99 @@
+---
+title: Streaming Reads
+keywords: [hudi, spark, flink, streaming, processing]
+---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Spark Streaming
+
+Structured Streaming reads are based on Hudi's Incremental Query feature,
therefore streaming read can return data for which
+commits and base files were not yet removed by the cleaner. You can control
commits retention time.
+
+<Tabs
+groupId="programming-language"
+defaultValue="python"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Overwrite).
+ save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+ format("hudi").
+ load(basePath).
+ writeStream.
+ format("console").
+ start()
+
+// read stream to streaming df
+val df = spark.readStream.
+ format("hudi").
+ load(basePath)
+
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+ dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+ options(**hudi_options). \
+ mode("overwrite"). \
+ save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+ .format("hudi") \
+ .load(basePath)
+
+# ead stream and output results to console
+spark.readStream \
+ .format("hudi") \
+ .load(basePath) \
+ .writeStream \
+ .format("console") \
+ .start()
+
+```
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE
and MERGE INTO.
+Target table must exist before write.
+:::
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index e1f659f4c88..61e7d33cd7f 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -101,7 +101,7 @@ TBLPROPERTIES (
```
### Create table from an external location
-Often, Hudi tables are created from streaming writers like the [streamer
tool](/docs/next/hoodie_streaming_ingestion#hudi-streamer), which
+Often, Hudi tables are created from streaming writers like the [streamer
tool](/docs/hoodie_streaming_ingestion#hudi-streamer), which
may later need some SQL statements to run on them. You can create an External
table using the `location` statement.
```sql
@@ -503,7 +503,7 @@ Hudi currently has the following limitations when using
Spark SQL, to create/alt
- A new Hudi table created by Spark SQL will by default set
`hoodie.datasource.write.hive_style_partitioning=true`, for ease
of use. This can be overridden using table properties.
-## Flink
+## Flink SQL
### Create Catalog
The catalog helps to manage the SQL tables, the table can be shared among
sessions if the catalog persists the table definitions.
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 10219843564..e6765af0bc5 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -197,7 +197,7 @@ DELETE from hudi_table where uuid =
'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa';
These DML operations give you powerful tools for managing your tables using
Spark SQL.
You can control the behavior of these operations using various configuration
options, as explained in the documentation.
-## Flink
+## Flink SQL
Flink SQL provides several Data Manipulation Language (DML) actions for
interacting with Hudi tables. These operations allow you to insert, update and
delete data from your Hudi tables. Let's explore them one by one.
@@ -269,3 +269,117 @@ INSERT INTO hudi_table/*+
OPTIONS('${hoodie.config.key1}'='${hoodie.config.value
```sql
INSERT INTO hudi_table/*+ OPTIONS('hoodie.keep.max.commits'='true')*/
```
+
+## Flink SQL In Action
+
+The hudi-flink module defines the Flink SQL connector for both hudi source and
sink.
+There are a number of options available for the sink table:
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| path | Y | N/A | Base path for the target hoodie table. The path would be
created if it does not exist, otherwise a hudi table expects to be initialized
successfully |
+| table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ |
+| write.operation | N | upsert | The write operation, that this write should
do (insert or upsert is supported) |
+| write.precombine.field | N | ts | Field used in preCombining before actual
write. When two records have the same key value, we will pick the one with the
largest value for the precombine field, determined by Object.compareTo(..) |
+| write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload
class used. Override this, if you like to roll your own merge logic, when
upserting/inserting. This will render any value set for the option in-effective
|
+| write.insert.drop.duplicates | N | false | Flag to indicate whether to drop
duplicates upon insert. By default insert will accept duplicates, to gain extra
performance |
+| write.ignore.failed | N | true | Flag to indicate whether to ignore any non
exception error (e.g. writestatus error). within a checkpoint batch. By default
true (in favor of streaming progressing over data integrity) |
+| hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value
to be used as the `recordKey` component of `HoodieKey`. Actual value will be
obtained by invoking .toString() on the field value. Nested fields can be
specified using the dot notation eg: `a.b.c` |
+| hoodie.datasource.write.keygenerator.class | N |
SimpleAvroKeyGenerator.class | Key generator class, that implements will
extract the key out of incoming record |
+| write.tasks | N | 4 | Parallelism of tasks that do actual write, default is
4 |
+| write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into
the underneath filesystem |
+
+If the table type is MERGE_ON_READ, you can also specify the asynchronous
compaction strategy through options:
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| compaction.async.enabled | N | true | Async Compaction, enabled by default
for MOR |
+| compaction.trigger.strategy | N | num_commits | Strategy to trigger
compaction, options are 'num_commits': trigger compaction when reach N delta
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
+| compaction.delta_commits | N | 5 | Max delta commits needed to trigger
compaction, default 5 commits |
+| compaction.delta_seconds | N | 3600 | Max delta seconds time needed to
trigger compaction, default 1 hour |
+
+You can write the data using the SQL `INSERT INTO` statements:
+```sql
+INSERT INTO hudi_table select ... from ...;
+```
+
+**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
+
+
+### Non-Blocking Concurrency Control (Experimental)
+
+Hudi Flink supports a new non-blocking concurrency control mode, where
multiple writer tasks can be executed
+concurrently without blocking each other. One can read more about this mode in
+the [concurrency control](/docs/next/concurrency_control#model-c-multi-writer)
docs. Let us see it in action here.
+
+In the below example, we have two streaming ingestion pipelines that
concurrently update the same table. One of the
+pipeline is responsible for the compaction and cleaning table services, while
the other pipeline is just for data
+ingestion.
+
+```sql
+-- set the interval as 30 seconds
+execution.checkpointing.interval: 30000
+state.backend: rocksdb
+
+-- This is a datagen source that can generates records continuously
+CREATE TABLE sourceT (
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` as 'par1'
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second' = '200'
+);
+
+-- pipeline1: by default enable the compaction and cleaning services
+CREATE TABLE t1(
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` varchar(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2'
+);
+
+-- pipeline2: disable the compaction and cleaning services manually
+CREATE TABLE t1_2(
+ uuid varchar(20),
+ name varchar(10),
+ age int,
+ ts timestamp(3),
+ `partition` varchar(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2',
+ 'compaction.schedule.enabled' = 'false',
+ 'compaction.async.enabled' = 'false',
+ 'clean.async.enabled' = 'false'
+);
+
+-- submit the pipelines
+insert into t1 select * from sourceT;
+insert into t1_2 select * from sourceT;
+
+select * from t1 limit 20;
+```
+
+As you can see from the above example, we have two pipelines with multiple
tasks that concurrently write to the
+same table. To use the new concurrency mode, all you need to do is set the
`hoodie.write.concurrency.mode`
+to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks` option is used to
specify the number of write tasks that will
+be used for writing to the table. The `compaction.schedule.enabled`,
`compaction.async.enabled`
+and `clean.async.enabled` options are used to disable the compaction and
cleaning services for the second pipeline.
+This is done to ensure that the compaction and cleaning services are not
executed twice for the same table.
+
+
diff --git a/website/docs/write_operations.md b/website/docs/write_operations.md
index 3146db05802..056d99c5f2b 100644
--- a/website/docs/write_operations.md
+++ b/website/docs/write_operations.md
@@ -6,8 +6,7 @@ last_modified_at:
---
It may be helpful to understand the different write operations of Hudi and how
best to leverage them. These operations
-can be chosen/changed across each commit/deltacommit issued against the table.
See the [How To docs on Writing Data](/docs/writing_data)
-to see more examples.
+can be chosen/changed across each commit/deltacommit issued against the table.
## Operation Types
### UPSERT
@@ -100,7 +99,7 @@ The following is an inside look on the Hudi write path and
the sequence of event
6. Update [Index](/docs/next/indexing)
1. Now that the write is performed, we will go back and update the index.
7. Commit
- 1. Finally we commit all of these changes atomically. (A [callback
notification](/docs/next/writing_data#commit-notifications) is exposed)
+ 1. Finally we commit all of these changes atomically. ([Post-commit
callback](/docs/next/platform_services_post_commit_callback) can be configured.)
8. [Clean](/docs/next/hoodie_cleaner) (if needed)
1. Following the commit, cleaning is invoked if needed.
9. [Compaction](/docs/next/compaction)
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index 10226cf9747..582d13bbd06 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -1,27 +1,16 @@
---
-title: Writing Data
-keywords: [hudi, incremental, batch, stream, processing, Hive, ETL, Spark SQL]
-summary: In this page, we will discuss some available tools for incrementally
ingesting & storing data.
-toc: true
-last_modified_at: 2019-12-30T15:59:57-04:00
+title: Batch Writes
+keywords: [hudi, incremental, batch, processing]
+last_modified_at: 2024-03-13T15:59:57-04:00
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
-In this section, we will cover ways to ingest new changes from external
sources or even other Hudi tables.
-Currently Hudi supports following ways to write the data.
-- [Hudi Streamer](/docs/hoodie_streaming_ingestion#hudi-streamer)
-- [Spark Hudi Datasource](#spark-datasource-writer)
-- [Spark Structured
Streaming](/docs/hoodie_streaming_ingestion#structured-streaming)
-- [Spark SQL](/docs/next/sql_ddl#spark-sql)
-- [Flink Writer](/docs/next/hoodie_streaming_ingestion#flink-ingestion)
-- [Flink SQL](/docs/next/sql_ddl#flink)
-- [Java Writer](#java-writer)
-- [Kafka Connect](/docs/next/hoodie_streaming_ingestion#kafka-connect-sink)
+## Spark DataSource API
-## Spark Datasource Writer
+The `hudi-spark` module offers the DataSource API to write a Spark DataFrame
into a Hudi table.
-The `hudi-spark` module offers the DataSource API to write (and read) a Spark
DataFrame into a Hudi table. There are a number of options available:
+There are a number of options available:
**`HoodieWriteConfig`**:
@@ -59,7 +48,7 @@ Upsert a DataFrame, specifying the necessary field names for
`recordKey => _row_
```java
inputDF.write()
- .format("org.apache.hudi")
+ .format("hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String].
clientOpts can include any other options necessary.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"partition")
@@ -99,7 +88,7 @@ You can check the data generated under
`/tmp/hudi_trips_cow/<region>/<country>/<
(`uuid` in
[schema](https://github.com/apache/hudi/blob/6f9b02decb5bb2b83709b1b6ec04a97e4d102c11/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60)),
partition field (`region/country/city`) and combine logic (`ts` in
[schema](https://github.com/apache/hudi/blob/6f9b02decb5bb2b83709b1b6ec04a97e4d102c11/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60))
to ensure trip records are unique within each partition. For more info, refer
to
[Modeling data stored in
Hudi](/docs/next/faq_general/#how-do-i-model-the-data-stored-in-hudi)
-and for info on ways to ingest data into Hudi, refer to [Writing Hudi
Tables](/docs/writing_data).
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi
Tables](/docs/hoodie_streaming_ingestion).
Here we are using the default write operation : `upsert`. If you have a
workload without updates, you can also issue
`insert` or `bulk_insert` operations which could be faster. To know more,
refer to [Write operations](/docs/write_operations)
:::
@@ -135,7 +124,7 @@ You can check the data generated under
`/tmp/hudi_trips_cow/<region>/<country>/<
(`uuid` in
[schema](https://github.com/apache/hudi/blob/2e6e302efec2fa848ded4f88a95540ad2adb7798/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60)),
partition field (`region/country/city`) and combine logic (`ts` in
[schema](https://github.com/apache/hudi/blob/2e6e302efec2fa848ded4f88a95540ad2adb7798/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60))
to ensure trip records are unique within each partition. For more info, refer
to
[Modeling data stored in
Hudi](/docs/next/faq_general/#how-do-i-model-the-data-stored-in-hudi)
-and for info on ways to ingest data into Hudi, refer to [Writing Hudi
Tables](/docs/writing_data).
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi
Tables](/docs/hoodie_streaming_ingestion).
Here we are using the default write operation : `upsert`. If you have a
workload without updates, you can also issue
`insert` or `bulk_insert` operations which could be faster. To know more,
refer to [Write operations](/docs/write_operations)
:::
@@ -434,9 +423,9 @@ Then any record you want to delete you can mark
`_hoodie_is_deleted` as true:
### Concurrency Control
-The `hudi-spark` module offers the DataSource API to write (and read) a Spark
DataFrame into a Hudi table.
+Following is an example of how to use `optimistic_concurrency_control` via
Spark DataSource API.
-Following is an example of how to use optimistic_concurrency_control via spark
datasource. Read more in depth about concurrency control in the [concurrency
control concepts](https://hudi.apache.org/docs/concurrency_control) section
+Read more in-depth details about concurrency control in the [concurrency
control concepts](/docs/concurrency_control) section.
```java
inputDF.write.format("hudi")
@@ -455,171 +444,5 @@ inputDF.write.format("hudi")
.save(basePath)
```
-### Commit Notifications
-Apache Hudi provides the ability to post a callback notification about a write
commit. This may be valuable if you need
-an event notification stream to take actions with other services after a Hudi
write commit.
-You can push a write commit callback notification into HTTP endpoints or to a
Kafka server.
-
-#### HTTP Endpoints
-You can push a commit notification to an HTTP URL and can specify custom
values by extending a callback class defined below.
-
-| Config | Description | Required | Default |
-| ----------- | ------- | ------- | ------ |
-| TURN_CALLBACK_ON | Turn commit callback on/off | optional | false
(*callbacks off*) |
-| CALLBACK_HTTP_URL | Callback host to be sent along with callback messages |
required | N/A |
-| CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional |
3 |
-| CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of
HoodieWriteCommitCallback class,
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default |
optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
-| CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional |
hudi_write_commit_http_callback |
-| | | | |
-
-#### Kafka Endpoints
-You can push a commit notification to a Kafka topic so it can be used by other
real time systems.
-
-| Config | Description | Required | Default |
-| ----------- | ------- | ------- | ------ |
-| TOPIC | Kafka topic name to publish timeline activity into. | required | N/A
|
-| PARTITION | It may be desirable to serialize all changes into a single Kafka
partition for providing strict ordering. By default, Kafka messages are keyed
by table name, which guarantees ordering at the table level, but not globally
(or when new partitions are added) | required | N/A |
-| RETRIES | Times to retry the produce | optional | 3 |
-| ACKS | kafka acks level, all by default to ensure strong durability |
optional | all |
-| BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for
publishing commit metadata | required | N/A |
-| | | | |
-
-#### Pulsar Endpoints
-You can push a commit notification to a Pulsar topic so it can be used by
other real time systems.
-
-| Config | Description
| Required | Default |
-| -----------
|-----------------------------------------------------------------------------|
------- |--------|
-| hoodie.write.commit.callback.pulsar.broker.service.url | Server's Url of
pulsar cluster to use to publish commit metadata. | required | N/A |
-| hoodie.write.commit.callback.pulsar.topic | Pulsar topic name to publish
timeline activity into | required | N/A |
-| hoodie.write.commit.callback.pulsar.producer.route-mode | Message routing
logic for producers on partitioned topics. | optional | RoundRobinPartition
|
-| hoodie.write.commit.callback.pulsar.producer.pending-queue-size | The
maximum size of a queue holding pending messages. | optional |
1000 |
-| hoodie.write.commit.callback.pulsar.producer.pending-total-size | The
maximum number of pending messages across partitions. | required | 50000 |
-| hoodie.write.commit.callback.pulsar.producer.block-if-queue-full | When the
queue is full, the method is blocked instead of an exception is thrown. |
optional | true |
-| hoodie.write.commit.callback.pulsar.producer.send-timeout | The timeout in
each sending to pulsar. | optional | 30s |
-| hoodie.write.commit.callback.pulsar.operation-timeout | Duration of waiting
for completing an operation. | optional | 30s |
-| hoodie.write.commit.callback.pulsar.connection-timeout | Duration of waiting
for a connection to a broker to be established. | optional | 10s |
-| hoodie.write.commit.callback.pulsar.request-timeout | Duration of waiting
for completing a request. | optional | 60s |
-| hoodie.write.commit.callback.pulsar.keepalive-interval | Duration of keeping
alive interval for each client broker connection. | optional | 30s |
-| |
| | |
-
-#### Bring your own implementation
-You can extend the HoodieWriteCommitCallback class to implement your own way
to asynchronously handle the callback
-of a successful write. Use this public API:
-
-https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
-
-## Flink SQL Writer
-The hudi-flink module defines the Flink SQL connector for both hudi source and
sink.
-There are a number of options available for the sink table:
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| path | Y | N/A | Base path for the target hoodie table. The path would be
created if it does not exist, otherwise a hudi table expects to be initialized
successfully |
-| table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ |
-| write.operation | N | upsert | The write operation, that this write should
do (insert or upsert is supported) |
-| write.precombine.field | N | ts | Field used in preCombining before actual
write. When two records have the same key value, we will pick the one with the
largest value for the precombine field, determined by Object.compareTo(..) |
-| write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload
class used. Override this, if you like to roll your own merge logic, when
upserting/inserting. This will render any value set for the option in-effective
|
-| write.insert.drop.duplicates | N | false | Flag to indicate whether to drop
duplicates upon insert. By default insert will accept duplicates, to gain extra
performance |
-| write.ignore.failed | N | true | Flag to indicate whether to ignore any non
exception error (e.g. writestatus error). within a checkpoint batch. By default
true (in favor of streaming progressing over data integrity) |
-| hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value
to be used as the `recordKey` component of `HoodieKey`. Actual value will be
obtained by invoking .toString() on the field value. Nested fields can be
specified using the dot notation eg: `a.b.c` |
-| hoodie.datasource.write.keygenerator.class | N |
SimpleAvroKeyGenerator.class | Key generator class, that implements will
extract the key out of incoming record |
-| write.tasks | N | 4 | Parallelism of tasks that do actual write, default is
4 |
-| write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into
the underneath filesystem |
-
-If the table type is MERGE_ON_READ, you can also specify the asynchronous
compaction strategy through options:
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| compaction.async.enabled | N | true | Async Compaction, enabled by default
for MOR |
-| compaction.trigger.strategy | N | num_commits | Strategy to trigger
compaction, options are 'num_commits': trigger compaction when reach N delta
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
-| compaction.delta_commits | N | 5 | Max delta commits needed to trigger
compaction, default 5 commits |
-| compaction.delta_seconds | N | 3600 | Max delta seconds time needed to
trigger compaction, default 1 hour |
-
-You can write the data using the SQL `INSERT INTO` statements:
-```sql
-INSERT INTO hudi_table select ... from ...;
-```
-
-**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
-
-
-### Non-Blocking Concurrency Control (Experimental)
-
-Hudi Flink supports a new non-blocking concurrency control mode, where
multiple writer tasks can be executed
-concurrently without blocking each other. One can read more about this mode in
-the [concurrency control](/docs/next/concurrency_control#model-c-multi-writer)
docs. Let us see it in action here.
-
-In the below example, we have two streaming ingestion pipelines that
concurrently update the same table. One of the
-pipeline is responsible for the compaction and cleaning table services, while
the other pipeline is just for data
-ingestion.
-
-```sql
--- set the interval as 30 seconds
-execution.checkpointing.interval: 30000
-state.backend: rocksdb
-
--- This is a datagen source that can generates records continuously
-CREATE TABLE sourceT (
- uuid varchar(20),
- name varchar(10),
- age int,
- ts timestamp(3),
- `partition` as 'par1'
-) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '200'
-);
-
--- pipeline1: by default enable the compaction and cleaning services
-CREATE TABLE t1(
- uuid varchar(20),
- name varchar(10),
- age int,
- ts timestamp(3),
- `partition` varchar(20)
-) WITH (
- 'connector' = 'hudi',
- 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
- 'table.type' = 'MERGE_ON_READ',
- 'index.type' = 'BUCKET',
- 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
- 'write.tasks' = '2'
-);
-
--- pipeline2: disable the compaction and cleaning services manually
-CREATE TABLE t1_2(
- uuid varchar(20),
- name varchar(10),
- age int,
- ts timestamp(3),
- `partition` varchar(20)
-) WITH (
- 'connector' = 'hudi',
- 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
- 'table.type' = 'MERGE_ON_READ',
- 'index.type' = 'BUCKET',
- 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
- 'write.tasks' = '2',
- 'compaction.schedule.enabled' = 'false',
- 'compaction.async.enabled' = 'false',
- 'clean.async.enabled' = 'false'
-);
-
--- submit the pipelines
-insert into t1 select * from sourceT;
-insert into t1_2 select * from sourceT;
-
-select * from t1 limit 20;
-```
-
-As you can see from the above example, we have two pipelines with multiple
tasks that concurrently write to the
-same table. To use the new concurrency mode, all you need to do is set the
`hoodie.write.concurrency.mode`
-to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks` option is used to
specify the number of write tasks that will
-be used for writing to the table. The `compaction.schedule.enabled`,
`compaction.async.enabled`
-and `clean.async.enabled` options are used to disable the compaction and
cleaning services for the second pipeline.
-This is done to ensure that the compaction and cleaning services are not
executed twice for the same table.
-
-
-## Java Writer
+## Java Client
We can use plain java to write to hudi tables. To use Java client we can
refere
[here](https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java)
-
diff --git a/website/docs/writing_tables_streaming_writes.md
b/website/docs/writing_tables_streaming_writes.md
new file mode 100644
index 00000000000..4bb43bb696b
--- /dev/null
+++ b/website/docs/writing_tables_streaming_writes.md
@@ -0,0 +1,92 @@
+---
+title: Streaming Writes
+keywords: [hudi, spark, flink, streaming, processing]
+last_modified_at: 2024-03-13T15:59:57-04:00
+---
+
+## Spark Streaming
+
+You can write Hudi tables using spark's structured streaming.
+
+<Tabs
+groupId="programming-language"
+defaultValue="python"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+ format("hudi").
+ load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, streamingTableName).
+ outputMode("append").
+ option("path", baseStreamingPath).
+ option("checkpointLocation", checkpointLocation).
+ trigger(Trigger.Once()).
+ start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+ 'hoodie.table.name': streamingTableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': streamingTableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+ .format("hudi") \
+ .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+ .options(**hudi_streaming_options) \
+ .outputMode("append") \
+ .option("path", baseStreamingPath) \
+ .option("checkpointLocation", checkpointLocation) \
+ .trigger(once=True) \
+ .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
diff --git a/website/releases/release-1.0.0-beta1.md
b/website/releases/release-1.0.0-beta1.md
index f970543feda..e091def2af2 100644
--- a/website/releases/release-1.0.0-beta1.md
+++ b/website/releases/release-1.0.0-beta1.md
@@ -66,7 +66,7 @@ OCC, multiple writers can operate on the table with
non-blocking conflict resolu
same file group with the conflicts resolved automatically by the query reader
and the compactor. The new concurrency
mode is currently available for preview in version 1.0.0-beta only. You can
read more about it under
section [Model C:
Multi-writer](/docs/next/concurrency_control#non-blocking-concurrency-control-mode-experimental).
A complete example with multiple
-Flink streaming writers is available
[here](/docs/next/writing_data#non-blocking-concurrency-control-experimental).
You
+Flink streaming writers is available
[here](/docs/next/sql_dml#non-blocking-concurrency-control-experimental). You
can follow the
[RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-66/rfc-66.md) and
the [JIRA](https://issues.apache.org/jira/browse/HUDI-6640) for more details.
diff --git a/website/sidebars.js b/website/sidebars.js
index d99e833edb3..6ef2b9c77a9 100644
--- a/website/sidebars.js
+++ b/website/sidebars.js
@@ -38,19 +38,30 @@ module.exports = {
},
{
type: 'category',
- label: 'How To',
+ label: 'Ingestion',
items: [
- {
- type: 'category',
- label: 'SQL',
- items: [
- 'sql_ddl',
- 'sql_dml',
- 'sql_queries',
- ],
- },
- 'writing_data',
'hoodie_streaming_ingestion',
+ 'ingestion_flink',
+ 'ingestion_kafka_connect',
+ ],
+ },
+ {
+ type: 'category',
+ label: 'Writing Tables',
+ items: [
+ 'sql_ddl',
+ 'sql_dml',
+ 'writing_data',
+ 'writing_tables_streaming_writes',
+ ],
+ },
+ {
+ type: 'category',
+ label: 'Reading Tables',
+ items: [
+ 'sql_queries',
+ 'reading_tables_batch_reads',
+ 'reading_tables_streaming_reads',
],
},
{
@@ -76,6 +87,7 @@ module.exports = {
items: [
'snapshot_exporter',
'precommit_validator',
+ 'platform_services_post_commit_callback',
{
type: 'category',
label: 'Syncing to Catalogs',
@@ -89,6 +101,20 @@ module.exports = {
}
],
},
+ {
+ type: 'category',
+ label: 'Operations',
+ items: [
+ 'performance',
+ 'deployment',
+ 'cli',
+ 'metrics',
+ 'encryption',
+ 'troubleshooting',
+ 'tuning-guide',
+ 'flink_tuning',
+ ],
+ },
{
type: 'category',
label: 'Configurations',
@@ -113,20 +139,6 @@ module.exports = {
},
],
},
- {
- type: 'category',
- label: 'Operations',
- items: [
- 'performance',
- 'deployment',
- 'cli',
- 'metrics',
- 'encryption',
- 'troubleshooting',
- 'tuning-guide',
- 'flink_tuning',
- ],
- },
{
type: 'category',
label: 'Frequently Asked Questions(FAQs)',
@@ -135,7 +147,7 @@ module.exports = {
'faq_general',
'faq_design_and_concepts',
'faq_writing_tables',
- 'faq_querying_tables',
+ 'faq_reading_tables',
'faq_table_services',
'faq_storage',
'faq_integrations',
diff --git a/website/versioned_docs/version-0.14.1/faq.md
b/website/versioned_docs/version-0.14.1/faq.md
index e212008f37c..b3629b4377a 100644
--- a/website/versioned_docs/version-0.14.1/faq.md
+++ b/website/versioned_docs/version-0.14.1/faq.md
@@ -9,7 +9,7 @@ The FAQs are split into following pages. Please refer to the
specific pages for
- [General](/docs/next/faq_general)
- [Design & Concepts](/docs/next/faq_design_and_concepts)
- [Writing Tables](/docs/next/faq_writing_tables)
-- [Querying Tables](/docs/next/faq_querying_tables)
+- [Querying Tables](/docs/next/faq_reading_tables)
- [Table Services](/docs/next/faq_table_services)
- [Storage](/docs/next/faq_storage)
- [Integrations](/docs/next/faq_integrations)