This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2223b2bce6f [HUDI-7502][DOCS] Re-organize the content in "how to" 
section (#10862)
2223b2bce6f is described below

commit 2223b2bce6fd2fb39a41b795fa5729c3406ef65d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Mar 13 21:47:16 2024 -0500

    [HUDI-7502][DOCS] Re-organize the content in "how to" section (#10862)
---
 website/docs/concurrency_control.md                |   2 +-
 website/docs/deployment.md                         |   2 +-
 website/docs/faq.md                                |   2 +-
 website/docs/faq_general.md                        |   4 +-
 ...aq_querying_tables.md => faq_reading_tables.md} |   4 +-
 website/docs/faq_table_services.md                 |   2 +-
 website/docs/faq_writing_tables.md                 |   4 +-
 website/docs/flink-quick-start-guide.md            |   4 +-
 website/docs/hoodie_streaming_ingestion.md         | 369 +--------------------
 website/docs/hudi_stack.md                         |   2 +-
 website/docs/ingestion_flink.md                    | 179 ++++++++++
 website/docs/ingestion_kafka_connect.md            |   7 +
 website/docs/overview.md                           |   2 +-
 .../docs/platform_services_post_commit_callback.md |  57 ++++
 website/docs/precommit_validator.md                |   2 +-
 website/docs/quick-start-guide.md                  |   8 +-
 website/docs/reading_tables_batch_reads.md         |  20 ++
 website/docs/reading_tables_streaming_reads.md     |  99 ++++++
 website/docs/sql_ddl.md                            |   4 +-
 website/docs/sql_dml.md                            | 116 ++++++-
 website/docs/write_operations.md                   |   5 +-
 website/docs/writing_data.md                       | 201 +----------
 website/docs/writing_tables_streaming_writes.md    |  92 +++++
 website/releases/release-1.0.0-beta1.md            |   2 +-
 website/sidebars.js                                |  64 ++--
 website/versioned_docs/version-0.14.1/faq.md       |   2 +-
 26 files changed, 645 insertions(+), 610 deletions(-)

diff --git a/website/docs/concurrency_control.md 
b/website/docs/concurrency_control.md
index dd4e217829e..461f2d1dd3c 100644
--- a/website/docs/concurrency_control.md
+++ b/website/docs/concurrency_control.md
@@ -77,7 +77,7 @@ Multiple writers can operate on the table with non-blocking 
conflict resolution.
 file group with the conflicts resolved automatically by the query reader and 
the compactor. The new concurrency mode is
 currently available for preview in version 1.0.0-beta only with the caveat 
that conflict resolution is not supported yet
 between clustering and ingestion. It works for compaction and ingestion, and 
we can see an example of that with Flink
-writers 
[here](/docs/next/writing_data#non-blocking-concurrency-control-experimental).
+writers 
[here](/docs/next/sql_dml#non-blocking-concurrency-control-experimental).
 
 ## Enabling Multi Writing
 
diff --git a/website/docs/deployment.md b/website/docs/deployment.md
index f5ad89c7f81..b6d56b9937e 100644
--- a/website/docs/deployment.md
+++ b/website/docs/deployment.md
@@ -136,7 +136,7 @@ Here is an example invocation for reading from kafka topic 
in a continuous mode
 
 ### Spark Datasource Writer Jobs
 
-As described in [Writing Data](/docs/writing_data#spark-datasource-writer), 
you can use spark datasource to ingest to hudi table. This mechanism allows you 
to ingest any spark dataframe in Hudi format. Hudi Spark DataSource also 
supports spark streaming to ingest a streaming source to Hudi table. For Merge 
On Read table types, inline compaction is turned on by default which runs after 
every ingestion run. The compaction frequency can be changed by setting the 
property "hoodie.compact.inl [...]
+As described in [Batch Writes](/docs/next/writing_data#spark-datasource-api), 
you can use spark datasource to ingest to hudi table. This mechanism allows you 
to ingest any spark dataframe in Hudi format. Hudi Spark DataSource also 
supports spark streaming to ingest a streaming source to Hudi table. For Merge 
On Read table types, inline compaction is turned on by default which runs after 
every ingestion run. The compaction frequency can be changed by setting the 
property "hoodie.compact.i [...]
 
 Here is an example invocation using spark datasource
 
diff --git a/website/docs/faq.md b/website/docs/faq.md
index e212008f37c..1378839b81c 100644
--- a/website/docs/faq.md
+++ b/website/docs/faq.md
@@ -9,7 +9,7 @@ The FAQs are split into following pages. Please refer to the 
specific pages for
 - [General](/docs/next/faq_general)
 - [Design & Concepts](/docs/next/faq_design_and_concepts)
 - [Writing Tables](/docs/next/faq_writing_tables)
-- [Querying Tables](/docs/next/faq_querying_tables)
+- [Reading Tables](/docs/next/faq_reading_tables)
 - [Table Services](/docs/next/faq_table_services)
 - [Storage](/docs/next/faq_storage)
 - [Integrations](/docs/next/faq_integrations)
diff --git a/website/docs/faq_general.md b/website/docs/faq_general.md
index 2621b84e3a2..2682d17e950 100644
--- a/website/docs/faq_general.md
+++ b/website/docs/faq_general.md
@@ -6,7 +6,7 @@ keywords: [hudi, writing, reading]
 
 ### When is Hudi useful for me or my organization?
 
-If you are looking to quickly ingest data onto HDFS or cloud storage, Hudi can 
provide you tools to [help](/docs/writing_data/). Also, if you have 
ETL/hive/spark jobs which are slow/taking up a lot of resources, Hudi can 
potentially help by providing an incremental approach to reading and writing 
data.
+If you are looking to quickly ingest data onto HDFS or cloud storage, Hudi 
provides you [tools](/docs/hoodie_streaming_ingestion). Also, if you have 
ETL/hive/spark jobs which are slow/taking up a lot of resources, Hudi can 
potentially help by providing an incremental approach to reading and writing 
data.
 
 As an organization, Hudi can help you build an [efficient data 
lake](https://docs.google.com/presentation/d/1FHhsvh70ZP6xXlHdVsAI0g__B_6Mpto5KQFlZ0b8-mM/edit#slide=id.p),
 solving some of the most complex, low-level storage management problems, while 
putting data into hands of your data analysts, engineers and scientists much 
quicker.
 
@@ -61,7 +61,7 @@ Nonetheless, Hudi is designed very much like a database and 
provides similar fun
 
 ### How do I model the data stored in Hudi?
 
-When writing data into Hudi, you model the records like how you would on a 
key-value store - specify a key field (unique for a single partition/across 
table), a partition field (denotes partition to place key into) and 
preCombine/combine logic that specifies how to handle duplicates in a batch of 
records written. This model enables Hudi to enforce primary key constraints 
like you would get on a database table. See [here](/docs/writing_data/) for an 
example.
+When writing data into Hudi, you model the records like how you would on a 
key-value store - specify a key field (unique for a single partition/across 
table), a partition field (denotes partition to place key into) and 
preCombine/combine logic that specifies how to handle duplicates in a batch of 
records written. This model enables Hudi to enforce primary key constraints 
like you would get on a database table. See [here](/docs/next/writing_data) for 
an example.
 
 When querying/reading data, Hudi just presents itself as a json-like 
hierarchical table, everyone is used to querying using Hive/Spark/Presto over 
Parquet/Json/Avro.
 
diff --git a/website/docs/faq_querying_tables.md 
b/website/docs/faq_reading_tables.md
similarity index 98%
rename from website/docs/faq_querying_tables.md
rename to website/docs/faq_reading_tables.md
index 9b2f8484c09..207d90c487b 100644
--- a/website/docs/faq_querying_tables.md
+++ b/website/docs/faq_reading_tables.md
@@ -1,8 +1,8 @@
 ---
-title: Querying Tables
+title: Reading Tables
 keywords: [hudi, writing, reading]
 ---
-# Querying Tables
+# Reading Tables FAQ
 
 ### Does deleted records appear in Hudi's incremental query results?
 
diff --git a/website/docs/faq_table_services.md 
b/website/docs/faq_table_services.md
index 59dc4e71ddd..0ca730094e4 100644
--- a/website/docs/faq_table_services.md
+++ b/website/docs/faq_table_services.md
@@ -50,6 +50,6 @@ Hudi runs cleaner to remove old file versions as part of 
writing data either in
 
 Yes. Hudi provides the ability to post a callback notification about a write 
commit. You can use a http hook or choose to
 
-be notified via a Kafka/pulsar topic or plug in your own implementation to get 
notified. Please refer [here](writing_data/#commit-notifications)
+be notified via a Kafka/pulsar topic or plug in your own implementation to get 
notified. Please refer [here](/docs/next/platform_services_post_commit_callback)
 
 for details
diff --git a/website/docs/faq_writing_tables.md 
b/website/docs/faq_writing_tables.md
index 5eb7b2cbbc4..bb1c1a01f74 100644
--- a/website/docs/faq_writing_tables.md
+++ b/website/docs/faq_writing_tables.md
@@ -6,7 +6,7 @@ keywords: [hudi, writing, reading]
 
 ### What are some ways to write a Hudi table?
 
-Typically, you obtain a set of partial updates/inserts from your source and 
issue [write operations](/docs/write_operations/) against a Hudi table. If you 
ingesting data from any of the standard sources like Kafka, or tailing DFS, the 
[delta streamer](/docs/hoodie_streaming_ingestion#deltastreamer) tool is 
invaluable and provides an easy, self-managed solution to getting data written 
into Hudi. You can also write your own code to capture data from a custom 
source using the Spark datasour [...]
+Typically, you obtain a set of partial updates/inserts from your source and 
issue [write operations](/docs/write_operations/) against a Hudi table. If you 
ingesting data from any of the standard sources like Kafka, or tailing DFS, the 
[delta streamer](/docs/hoodie_streaming_ingestion#hudi-streamer) tool is 
invaluable and provides an easy, self-managed solution to getting data written 
into Hudi. You can also write your own code to capture data from a custom 
source using the Spark datasour [...]
 
 ### How is a Hudi writer job deployed?
 
@@ -68,7 +68,7 @@ As you could see, ([combineAndGetUpdateValue(), 
getInsertValue()](https://github
 
 ### How do I delete records in the dataset using Hudi?
 
-GDPR has made deletes a must-have tool in everyone's data management toolbox. 
Hudi supports both soft and hard deletes. For details on how to actually 
perform them, see [here](/docs/writing_data/#deletes).
+GDPR has made deletes a must-have tool in everyone's data management toolbox. 
Hudi supports both soft and hard deletes. For details on how to actually 
perform them, see [here](/docs/next/writing_data#deletes).
 
 ### Should I need to worry about deleting all copies of the records in case of 
duplicates?
 
diff --git a/website/docs/flink-quick-start-guide.md 
b/website/docs/flink-quick-start-guide.md
index 89e60737859..bdca36c6853 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -448,8 +448,8 @@ feature is that it now lets you author streaming pipelines 
on streaming or batch
 ## Where To Go From Here?
 - **Quick Start** : Read [Quick Start](#quick-start) to get started quickly 
Flink sql client to write to(read from) Hudi.
 - **Configuration** : For [Global 
Configuration](/docs/next/flink_tuning#global-configurations), sets up through 
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through 
[Table Option](/docs/next/flink_tuning#table-options).
-- **Writing Data** : Flink supports different modes for writing, such as [CDC 
Ingestion](/docs/hoodie_streaming_ingestion#cdc-ingestion), [Bulk 
Insert](/docs/hoodie_streaming_ingestion#bulk-insert), [Index 
Bootstrap](/docs/hoodie_streaming_ingestion#index-bootstrap), [Changelog 
Mode](/docs/hoodie_streaming_ingestion#changelog-mode) and [Append 
Mode](/docs/hoodie_streaming_ingestion#append-mode). Flink also supports 
multiple streaming writers with [non-blocking concurrency control](/docs/ [...]
-- **Querying Data** : Flink supports different modes for reading, such as 
[Streaming Query](/docs/querying_data#streaming-query) and [Incremental 
Query](/docs/querying_data#incremental-query).
+- **Writing Data** : Flink supports different modes for writing, such as [CDC 
Ingestion](/docs/next/ingestion_flink#cdc-ingestion), [Bulk 
Insert](/docs/next/ingestion_flink#bulk-insert), [Index 
Bootstrap](/docs/next/ingestion_flink#index-bootstrap), [Changelog 
Mode](/docs/next/ingestion_flink#changelog-mode) and [Append 
Mode](/docs/next/ingestion_flink#append-mode). Flink also supports multiple 
streaming writers with [non-blocking concurrency 
control](/docs/next/sql_dml#non-blocking-conc [...]
+- **Reading Data** : Flink supports different modes for reading, such as 
[Streaming Query](/docs/sql_queries#streaming-query) and [Incremental 
Query](/docs/sql_queries#incremental-query).
 - **Tuning** : For write/read tasks, this guide gives some tuning suggestions, 
such as [Memory Optimization](/docs/next/flink_tuning#memory-optimization) and 
[Write Rate Limit](/docs/next/flink_tuning#write-rate-limit).
 - **Optimization**: Offline compaction is supported [Offline 
Compaction](/docs/compaction#flink-offline-compaction).
 - **Query Engines**: Besides Flink, many other engines are integrated: [Hive 
Query](/docs/syncing_metastore#flink-setup), [Presto 
Query](/docs/querying_data#prestodb).
diff --git a/website/docs/hoodie_streaming_ingestion.md 
b/website/docs/hoodie_streaming_ingestion.md
index 6bc2f288e4b..8a8323c3bf0 100644
--- a/website/docs/hoodie_streaming_ingestion.md
+++ b/website/docs/hoodie_streaming_ingestion.md
@@ -1,5 +1,5 @@
 ---
-title: Streaming Ingestion
+title: Using Spark
 keywords: [hudi, streamer, hoodiestreamer, spark_streaming]
 ---
 import Tabs from '@theme/Tabs';
@@ -571,370 +571,3 @@ to how you run `HoodieStreamer`.
 ```
 
 For detailed information on how to configure and use 
`HoodieMultiTableStreamer`, please refer [blog 
section](/blog/2020/08/22/ingest-multiple-tables-using-hudi).
-
-## Structured Streaming
-
-Hudi supports Spark Structured Streaming reads and writes.
-
-### Streaming Write
-You can write Hudi tables using spark's structured streaming. 
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// prepare to stream write to new table
-import org.apache.spark.sql.streaming.Trigger
-
-val streamingTableName = "hudi_trips_cow_streaming"
-val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
-
-// create streaming df
-val df = spark.readStream.
-        format("hudi").
-        load(basePath)
-
-// write stream to new hudi table
-df.writeStream.format("hudi").
-  options(getQuickstartWriteConfigs).
-  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-  option(TABLE_NAME, streamingTableName).
-  outputMode("append").
-  option("path", baseStreamingPath).
-  option("checkpointLocation", checkpointLocation).
-  trigger(Trigger.Once()).
-  start()
-
-```
-
-</TabItem>
-<TabItem value="python">
-
-```python
-# pyspark
-# prepare to stream write to new table
-streamingTableName = "hudi_trips_cow_streaming"
-baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
-
-hudi_streaming_options = {
-    'hoodie.table.name': streamingTableName,
-    'hoodie.datasource.write.recordkey.field': 'uuid',
-    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
-    'hoodie.datasource.write.table.name': streamingTableName,
-    'hoodie.datasource.write.operation': 'upsert',
-    'hoodie.datasource.write.precombine.field': 'ts',
-    'hoodie.upsert.shuffle.parallelism': 2,
-    'hoodie.insert.shuffle.parallelism': 2
-}
-
-# create streaming df
-df = spark.readStream \
-    .format("hudi") \
-    .load(basePath)
-
-# write stream to new hudi table
-df.writeStream.format("hudi") \
-    .options(**hudi_streaming_options) \
-    .outputMode("append") \
-    .option("path", baseStreamingPath) \
-    .option("checkpointLocation", checkpointLocation) \
-    .trigger(once=True) \
-    .start()
-
-```
-
-</TabItem>
-
-</Tabs
->
-
-### Streaming Read
-
-Structured Streaming reads are based on Hudi's Incremental Query feature, 
therefore streaming read can return data for which
-commits and base files were not yet removed by the cleaner. You can control 
commits retention time.
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// reload data
-df.write.format("hudi").
-  options(getQuickstartWriteConfigs).
-  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
-  option(TABLE_NAME, tableName).
-  mode(Overwrite).
-  save(basePath)
-
-// read stream and output results to console
-spark.readStream.
-  format("hudi").
-  load(basePath).
-  writeStream.
-  format("console").
-  start()
-
-// read stream to streaming df
-val df = spark.readStream.
-        format("hudi").
-        load(basePath)
-
-```
-</TabItem>
-
-<TabItem value="python">
-
-```python
-# pyspark
-# reload data
-inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
-    dataGen.generateInserts(10))
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
-
-hudi_options = {
-    'hoodie.table.name': tableName,
-    'hoodie.datasource.write.recordkey.field': 'uuid',
-    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
-    'hoodie.datasource.write.table.name': tableName,
-    'hoodie.datasource.write.operation': 'upsert',
-    'hoodie.datasource.write.precombine.field': 'ts',
-    'hoodie.upsert.shuffle.parallelism': 2,
-    'hoodie.insert.shuffle.parallelism': 2
-}
-
-df.write.format("hudi"). \
-    options(**hudi_options). \
-    mode("overwrite"). \
-    save(basePath)
-
-# read stream to streaming df
-df = spark.readStream \
-    .format("hudi") \
-    .load(basePath)
-
-# ead stream and output results to console
-spark.readStream \
-    .format("hudi") \
-    .load(basePath) \
-    .writeStream \
-    .format("console") \
-    .start()
-
-```
-</TabItem>
-
-</Tabs
->
-
-
-:::info
-Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
-Target table must exist before write.
-:::
-
-
-## Flink Ingestion
-
-### CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
-We recommend two ways for syncing CDC data into Hudi:
-
-![slide1 title](/assets/images/cdc-2-hudi.png)
-
-1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
-   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
-   but the disadvantage is that it relies on message queues.
-
-:::note
-- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
-:::
-
-### Bulk Insert
-
-For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
-import the snapshot data into Hudi.
-
-
-:::note
-`bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
-:::
-
-:::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by
-frequent `file handle` switching.  
-:::
-
-:::note  
-The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks).
-:::
-
-#### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
-| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= 
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks) |
-| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle 
data according to the input field before writing. Enabling this option will 
reduce the number of small files, but there may be a risk of data skew |
-| `write.bulk_insert.sort_input` | `false`  | `true` | Whether to sort data 
according to the input field before writing. Enabling this option will reduce 
the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
-
-### Index Bootstrap
-
-For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
-
-:::note
-If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate 
limit function).
-:::
-
-#### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
-| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
-
-#### How To Use
-
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
-
-:::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
-:::
-
-### Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
-
-#### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
-
-:::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
-:::
-
-:::note
-After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
-:::
-
-
-### Append Mode
-
-For `INSERT` mode write operation, the current work flow is:
-
-- For Merge_On_Read table, the small file strategies are by default applied: 
tries to append to the small avro log files first
-- For Copy_On_Write table, write new parquet files directly, no small file 
strategies are applied
-
-Hudi supports rich clustering strategies to optimize the files layout for 
`INSERT` mode:
-
-#### Inline Clustering
-
-:::note
-Only Copy_On_Write table is supported. 
-:::
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
-
-#### Async Clustering
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule 
clustering plan during write process, by default false |
-| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the 
clustering plan, only valid when `clustering.schedule.enabled` is true |
-| `clustering.async.enabled` | `false` | `false` | Whether to execute 
clustering plan asynchronously, by default false |
-| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
-| `clustering.plan.strategy.target.file.max.bytes` | `false` | 
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
-| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file 
that has less size than the threshold (unit MB) are candidates for clustering |
-| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to 
sort by when clustering |
-
-#### Clustering Plan Strategy
-
-Custom clustering strategy is supported.
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options 
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent 
days; 3) `SELECTED_PARTITIONS`: specific partitions |
-| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` | 
Valid for `RECENT_DAYS` mode |
-| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
-| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
-| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The 
regex to filter the partitions |
-| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific 
partitions separated by comma `,` |
-
-### Bucket Index
-
-By default, flink uses the state-backend to keep the file index: the mapping 
from primary key to fileId. When the input data set is large,
-there is possibility the cost of the state be a bottleneck, the bucket index 
use deterministic hash algorithm for shuffling the records into
-buckets, thus can avoid the storage and query overhead of indexes.
-
-#### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket 
index |
-| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset 
of the primary key |
-| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets 
per-partition, it is immutable once set up |
-
-Comparing to state index:
-
-- Bucket index has no computing and storage cost of state-backend index, thus 
has better performance
-- Bucket index can not expand the buckets dynamically, the state-backend index 
can expand the buckets dynamically based on current file layout
-- Bucket index can not handle changes among partitions(no limit if the input 
itself is CDC stream), state-backend index has no limit 
-
-### Rate Limit
-There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. For this case, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
-
-#### Options
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
-
-## Kafka Connect Sink
-If you want to perform streaming ingestion into Hudi format similar to 
`HoodieStreamer`, but you don't want to depend on Spark,
-try out the new experimental release of Hudi Kafka Connect Sink. Read the 
[ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect) 
-for full documentation.
-
diff --git a/website/docs/hudi_stack.md b/website/docs/hudi_stack.md
index 36a1b7a4a4c..a3a7896c92e 100644
--- a/website/docs/hudi_stack.md
+++ b/website/docs/hudi_stack.md
@@ -93,7 +93,7 @@ Hudi provides snapshot isolation for writers and readers, 
enabling consistent ta
 ![Platform Services](/assets/images/blog/hudistack/platform_2.png)
 <p align = "center">Figure: Various platform services in Hudi</p>
 
-Platform services offer functionality that is specific to data and workloads, 
and they sit directly on top of the table services, interfacing with writers 
and readers. Services, like [Hudi 
Streamer](./hoodie_streaming_ingestion#hudi-streamer), are specialized in 
handling data and workloads, seamlessly integrating with Kafka streams and 
various formats to build data lakes. They support functionalities like 
automatic checkpoint management, integration with major schema registries 
(includin [...]
+Platform services offer functionality that is specific to data and workloads, 
and they sit directly on top of the table services, interfacing with writers 
and readers. Services, like [Hudi 
Streamer](./hoodie_streaming_ingestion#hudi-streamer), are specialized in 
handling data and workloads, seamlessly integrating with Kafka streams and 
various formats to build data lakes. They support functionalities like 
automatic checkpoint management, integration with major schema registries 
(includin [...]
 
 ### Query Engines
 Apache Hudi is compatible with a wide array of query engines, catering to 
various analytical needs. For distributed ETL batch processing, Apache Spark is 
frequently utilized, leveraging its efficient handling of large-scale data. In 
the realm of streaming use cases, compute engines such as Apache Flink and 
Apache Spark's Structured Streaming provide robust support when paired with 
Hudi. Moreover, Hudi supports modern data lake query engines such as Trino and 
Presto, as well as modern ana [...]
\ No newline at end of file
diff --git a/website/docs/ingestion_flink.md b/website/docs/ingestion_flink.md
new file mode 100644
index 00000000000..e9410021374
--- /dev/null
+++ b/website/docs/ingestion_flink.md
@@ -0,0 +1,179 @@
+---
+title: Using Flink
+keywords: [hudi, flink, streamer, ingestion]
+---
+
+### CDC Ingestion
+CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
+We recommend two ways for syncing CDC data into Hudi:
+
+![slide1 title](/assets/images/cdc-2-hudi.png)
+
+1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
+   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the db server;
+2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
+   but the disadvantage is that it relies on message queues.
+
+:::note
+- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
+:::
+
+### Bulk Insert
+
+For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
+import the snapshot data into Hudi.
+
+
+:::note
+`bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
+:::
+
+:::note
+`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records
+by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by
+frequent `file handle` switching.  
+:::
+
+:::note  
+The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
+In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it
+will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks).
+:::
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
+| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= 
[`write.bucket_assign.tasks`](/docs/configurations#writebucket_assigntasks) |
+| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle 
data according to the input field before writing. Enabling this option will 
reduce the number of small files, but there may be a risk of data skew |
+| `write.bulk_insert.sort_input` | `false`  | `true` | Whether to sort data 
according to the input field before writing. Enabling this option will reduce 
the number of small files when a write task writes multiple partitions |
+| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
+
+### Index Bootstrap
+
+For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert).
+User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
+
+:::note
+If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`,
+and then reduce the resources to write `incremental data` (or open the rate 
limit function).
+:::
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
+| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
+
+#### How To Use
+
+1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
+2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
+3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
+4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
+5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` as `false`.
+
+:::note
+1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
+2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
+3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
+4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+:::
+
+### Changelog Mode
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
+data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
+All changelog records can be consumed with Flink streaming reader.
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
+
+:::note
+Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
+:::
+
+:::note
+After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
+stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
+the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
+:::
+
+
+### Append Mode
+
+For `INSERT` mode write operation, the current work flow is:
+
+- For Merge_On_Read table, the small file strategies are by default applied: 
tries to append to the small avro log files first
+- For Copy_On_Write table, write new parquet files directly, no small file 
strategies are applied
+
+Hudi supports rich clustering strategies to optimize the files layout for 
`INSERT` mode:
+
+#### Inline Clustering
+
+:::note
+Only Copy_On_Write table is supported.
+:::
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
+
+#### Async Clustering
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule 
clustering plan during write process, by default false |
+| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the 
clustering plan, only valid when `clustering.schedule.enabled` is true |
+| `clustering.async.enabled` | `false` | `false` | Whether to execute 
clustering plan asynchronously, by default false |
+| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
+| `clustering.plan.strategy.target.file.max.bytes` | `false` | 
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
+| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file 
that has less size than the threshold (unit MB) are candidates for clustering |
+| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to 
sort by when clustering |
+
+#### Clustering Plan Strategy
+
+Custom clustering strategy is supported.
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options 
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent 
days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` | 
Valid for `RECENT_DAYS` mode |
+| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
+| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
+| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The 
regex to filter the partitions |
+| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific 
partitions separated by comma `,` |
+
+### Bucket Index
+
+By default, flink uses the state-backend to keep the file index: the mapping 
from primary key to fileId. When the input data set is large,
+there is possibility the cost of the state be a bottleneck, the bucket index 
use deterministic hash algorithm for shuffling the records into
+buckets, thus can avoid the storage and query overhead of indexes.
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket 
index |
+| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset 
of the primary key |
+| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets 
per-partition, it is immutable once set up |
+
+Comparing to state index:
+
+- Bucket index has no computing and storage cost of state-backend index, thus 
has better performance
+- Bucket index can not expand the buckets dynamically, the state-backend index 
can expand the buckets dynamically based on current file layout
+- Bucket index can not handle changes among partitions(no limit if the input 
itself is CDC stream), state-backend index has no limit
+
+### Rate Limit
+There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. For this case, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
diff --git a/website/docs/ingestion_kafka_connect.md 
b/website/docs/ingestion_kafka_connect.md
new file mode 100644
index 00000000000..8f020666ae9
--- /dev/null
+++ b/website/docs/ingestion_kafka_connect.md
@@ -0,0 +1,7 @@
+---
+title: Using Kafka Connect
+keywords: [hudi, kafka, connector, ingestion]
+---
+
+Try out the new experimental release of Hudi Kafka Connect Sink. Read
+the [ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect) 
for full documentation.
diff --git a/website/docs/overview.md b/website/docs/overview.md
index 88195f565b4..eb92cc34970 100644
--- a/website/docs/overview.md
+++ b/website/docs/overview.md
@@ -13,7 +13,7 @@ how to learn more to get started.
 Apache Hudi (pronounced “hoodie”) is the next generation [streaming data lake 
platform](/blog/2021/07/21/streaming-data-lake-platform). 
 Apache Hudi brings core warehouse and database functionality directly to a 
data lake. Hudi provides [tables](/docs/next/sql_ddl), 
 [transactions](/docs/next/timeline), [efficient 
upserts/deletes](/docs/next/write_operations), [advanced 
indexes](/docs/next/indexing), 
-[streaming ingestion services](/docs/next/hoodie_streaming_ingestion), data 
[clustering](/docs/next/clustering)/[compaction](/docs/next/compaction) 
optimizations, 
+[ingestion services](/docs/hoodie_streaming_ingestion), data 
[clustering](/docs/next/clustering)/[compaction](/docs/next/compaction) 
optimizations, 
 and [concurrency](/docs/next/concurrency_control) all while keeping your data 
in open source file formats.
 
 Not only is Apache Hudi great for streaming workloads, but it also allows you 
to create efficient incremental batch pipelines. 
diff --git a/website/docs/platform_services_post_commit_callback.md 
b/website/docs/platform_services_post_commit_callback.md
new file mode 100644
index 00000000000..0df43a2a017
--- /dev/null
+++ b/website/docs/platform_services_post_commit_callback.md
@@ -0,0 +1,57 @@
+---
+title: Post-commit Callback
+keywords: [hudi, platform, commit, callback]
+---
+
+Apache Hudi provides the ability to post a callback notification about a write 
commit. This may be valuable if you need
+an event notification stream to take actions with other services after a Hudi 
write commit.
+You can push a write commit callback notification into HTTP endpoints or to a 
Kafka server.
+
+## HTTP Endpoints
+You can push a commit notification to an HTTP URL and can specify custom 
values by extending a callback class defined below.
+
+|  Config  | Description | Required | Default |
+|  -----------  | -------  | ------- | ------ |
+| TURN_CALLBACK_ON | Turn commit callback on/off | optional | false 
(*callbacks off*) |
+| CALLBACK_HTTP_URL | Callback host to be sent along with callback messages | 
required | N/A |
+| CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional | 
3 |
+| CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of 
HoodieWriteCommitCallback class, 
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default | 
optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
+| CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional | 
hudi_write_commit_http_callback |
+| | | | |
+
+## Kafka Endpoints
+You can push a commit notification to a Kafka topic so it can be used by other 
real time systems.
+
+|  Config  | Description | Required | Default |
+|  -----------  | -------  | ------- | ------ |
+| TOPIC | Kafka topic name to publish timeline activity into. | required | N/A 
|
+| PARTITION | It may be desirable to serialize all changes into a single Kafka 
partition for providing strict ordering. By default, Kafka messages are keyed 
by table name, which guarantees ordering at the table level, but not globally 
(or when new partitions are added) | required | N/A |
+| RETRIES | Times to retry the produce | optional | 3 |
+| ACKS | kafka acks level, all by default to ensure strong durability | 
optional | all |
+| BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for 
publishing commit metadata | required | N/A |
+| | | | |
+
+## Pulsar Endpoints
+You can push a commit notification to a Pulsar topic so it can be used by 
other real time systems.
+
+|  Config  | Description                                                       
          | Required | Default |
+|  -----------  
|-----------------------------------------------------------------------------| 
------- |--------|
+| hoodie.write.commit.callback.pulsar.broker.service.url | Server's Url of 
pulsar cluster to use to publish commit metadata.   | required | N/A    |
+| hoodie.write.commit.callback.pulsar.topic | Pulsar topic name to publish 
timeline activity into | required | N/A    |
+| hoodie.write.commit.callback.pulsar.producer.route-mode | Message routing 
logic for producers on partitioned topics.  | optional |   RoundRobinPartition  
   |
+| hoodie.write.commit.callback.pulsar.producer.pending-queue-size | The 
maximum size of a queue holding pending messages.               | optional | 
1000   |
+| hoodie.write.commit.callback.pulsar.producer.pending-total-size | The 
maximum number of pending messages across partitions. | required | 50000  |
+| hoodie.write.commit.callback.pulsar.producer.block-if-queue-full | When the 
queue is full, the method is blocked instead of an exception is thrown. | 
optional | true   |
+| hoodie.write.commit.callback.pulsar.producer.send-timeout | The timeout in 
each sending to pulsar.     | optional | 30s    |
+| hoodie.write.commit.callback.pulsar.operation-timeout | Duration of waiting 
for completing an operation.     | optional | 30s    |
+| hoodie.write.commit.callback.pulsar.connection-timeout | Duration of waiting 
for a connection to a broker to be established.     | optional | 10s    |
+| hoodie.write.commit.callback.pulsar.request-timeout | Duration of waiting 
for completing a request.  | optional | 60s    |
+| hoodie.write.commit.callback.pulsar.keepalive-interval | Duration of keeping 
alive interval for each client broker connection. | optional | 30s    |
+| |                                                                            
 | |        |
+
+## Bring your own implementation
+You can extend the HoodieWriteCommitCallback class to implement your own way 
to asynchronously handle the callback
+of a successful write. Use this public API:
+
+https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
+
diff --git a/website/docs/precommit_validator.md 
b/website/docs/precommit_validator.md
index 741b9deb683..6f6806a3fa9 100644
--- a/website/docs/precommit_validator.md
+++ b/website/docs/precommit_validator.md
@@ -91,7 +91,7 @@ void validateRecordsBeforeAndAfter(Dataset<Row> before,
 ```
 
 ## Additional Monitoring with Notifications
-Hudi offers a [commit notification 
service](https://hudi.apache.org/docs/next/writing_data/#commit-notifications) 
that can be configured to trigger notifications about write commits.
+Hudi offers a [commit notification 
service](/docs/next/platform_services_post_commit_callback) that can be 
configured to trigger notifications about write commits.
 
 The commit notification service can be combined with pre-commit validators to 
send a notification when a commit fails a validation. This is possible by 
passing details about the validation as a custom value to the HTTP endpoint.
 
diff --git a/website/docs/quick-start-guide.md 
b/website/docs/quick-start-guide.md
index 255524853c3..9104edee8a7 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -539,7 +539,7 @@ MERGE statement either using `SET *` or using `SET column1 
= expression1 [, colu
 ## Delete data {#deletes}
 
 Delete operation removes the records specified from the table. For example, 
this code snippet deletes records
-for the HoodieKeys passed in. Check out the [deletion 
section](/docs/next/writing_data#deletes) for more details.
+for the HoodieKeys passed in. Check out the [deletion 
section](/docs/writing_data#deletes) for more details.
 
 <Tabs
 groupId="programming-language"
@@ -1119,13 +1119,13 @@ For alter table commands, check out 
[this](/docs/next/sql_ddl#spark-alter-table)
 
 Hudi provides industry-leading performance and functionality for streaming 
data. 
 
-**Hudi Streamer** - Hudi provides an incremental ingestion/ETL tool - 
[HoodieStreamer](/docs/next/hoodie_streaming_ingestion#hudi-streamer), to 
assist with ingesting data into Hudi 
+**Hudi Streamer** - Hudi provides an incremental ingestion/ETL tool - 
[HoodieStreamer](/docs/hoodie_streaming_ingestion#hudi-streamer), to assist 
with ingesting data into Hudi 
 from various different sources in a streaming manner, with powerful built-in 
capabilities like auto checkpointing, schema enforcement via schema provider, 
 transformation support, automatic table services and so on.
 
-**Structured Streaming** - Hudi supports Spark Structured Streaming reads and 
writes as well. Please see 
[here](/docs/next/hoodie_streaming_ingestion#structured-streaming) for more.
+**Structured Streaming** - Hudi supports Spark Structured Streaming reads and 
writes as well. Please see 
[here](/docs/next/writing_tables_streaming_writes#spark-streaming) for more.
 
-Check out more information on [modeling data in 
Hudi](/docs/next/faq_general#how-do-i-model-the-data-stored-in-hudi) and 
different ways to [writing Hudi Tables](/docs/next/writing_data).
+Check out more information on [modeling data in 
Hudi](/docs/next/faq_general#how-do-i-model-the-data-stored-in-hudi) and 
different ways to perform [batch writes](/docs/writing_data) and [streaming 
writes](/docs/next/writing_tables_streaming_writes).
 
 ### Dockerized Demo
 Even as we showcased the core capabilities, Hudi supports a lot more advanced 
functionality that can make it easy
diff --git a/website/docs/reading_tables_batch_reads.md 
b/website/docs/reading_tables_batch_reads.md
new file mode 100644
index 00000000000..2055a9685df
--- /dev/null
+++ b/website/docs/reading_tables_batch_reads.md
@@ -0,0 +1,20 @@
+---
+title: Batch Reads
+keywords: [hudi, spark, flink, batch, processing]
+---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Spark DataSource API
+
+The `hudi-spark` module offers the DataSource API to read a Hudi table into a 
Spark DataFrame.
+
+A time-travel query example:
+
+```Scala
+val tripsDF = spark.read.
+    option("as.of.instant", "2021-07-28 14:11:08.000").
+    format("hudi").
+    load(basePath)
+tripsDF.where(tripsDF.fare > 20.0).show()
+```
diff --git a/website/docs/reading_tables_streaming_reads.md 
b/website/docs/reading_tables_streaming_reads.md
new file mode 100644
index 00000000000..57c3e2c4702
--- /dev/null
+++ b/website/docs/reading_tables_streaming_reads.md
@@ -0,0 +1,99 @@
+---
+title: Streaming Reads
+keywords: [hudi, spark, flink, streaming, processing]
+---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Spark Streaming
+
+Structured Streaming reads are based on Hudi's Incremental Query feature, 
therefore streaming read can return data for which
+commits and base files were not yet removed by the cleaner. You can control 
commits retention time.
+
+<Tabs
+groupId="programming-language"
+defaultValue="python"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+</TabItem>
+
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# ead stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index e1f659f4c88..61e7d33cd7f 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -101,7 +101,7 @@ TBLPROPERTIES (
 ```
 
 ### Create table from an external location
-Often, Hudi tables are created from streaming writers like the [streamer 
tool](/docs/next/hoodie_streaming_ingestion#hudi-streamer), which
+Often, Hudi tables are created from streaming writers like the [streamer 
tool](/docs/hoodie_streaming_ingestion#hudi-streamer), which
 may later need some SQL statements to run on them. You can create an External 
table using the `location` statement.
 
 ```sql
@@ -503,7 +503,7 @@ Hudi currently has the following limitations when using 
Spark SQL, to create/alt
  - A new Hudi table created by Spark SQL will by default set 
`hoodie.datasource.write.hive_style_partitioning=true`, for ease
      of use. This can be overridden using table properties.
 
-## Flink
+## Flink SQL
 
 ### Create Catalog
 The catalog helps to manage the SQL tables, the table can be shared among 
sessions if the catalog persists the table definitions.
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 10219843564..e6765af0bc5 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -197,7 +197,7 @@ DELETE from hudi_table where uuid = 
'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa';
 These DML operations give you powerful tools for managing your tables using 
Spark SQL. 
 You can control the behavior of these operations using various configuration 
options, as explained in the documentation.
 
-## Flink 
+## Flink SQL
 
 Flink SQL provides several Data Manipulation Language (DML) actions for 
interacting with Hudi tables. These operations allow you to insert, update and 
delete data from your Hudi tables. Let's explore them one by one.
 
@@ -269,3 +269,117 @@ INSERT INTO hudi_table/*+ 
OPTIONS('${hoodie.config.key1}'='${hoodie.config.value
 ```sql
 INSERT INTO hudi_table/*+ OPTIONS('hoodie.keep.max.commits'='true')*/
 ```
+
+## Flink SQL In Action
+
+The hudi-flink module defines the Flink SQL connector for both hudi source and 
sink.
+There are a number of options available for the sink table:
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| path | Y | N/A | Base path for the target hoodie table. The path would be 
created if it does not exist, otherwise a hudi table expects to be initialized 
successfully |
+| table.type  | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) 
MERGE_ON_READ |
+| write.operation | N | upsert | The write operation, that this write should 
do (insert or upsert is supported) |
+| write.precombine.field | N | ts | Field used in preCombining before actual 
write. When two records have the same key value, we will pick the one with the 
largest value for the precombine field, determined by Object.compareTo(..) |
+| write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload 
class used. Override this, if you like to roll your own merge logic, when 
upserting/inserting. This will render any value set for the option in-effective 
|
+| write.insert.drop.duplicates | N | false | Flag to indicate whether to drop 
duplicates upon insert. By default insert will accept duplicates, to gain extra 
performance |
+| write.ignore.failed | N | true | Flag to indicate whether to ignore any non 
exception error (e.g. writestatus error). within a checkpoint batch. By default 
true (in favor of streaming progressing over data integrity) |
+| hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value 
to be used as the `recordKey` component of `HoodieKey`. Actual value will be 
obtained by invoking .toString() on the field value. Nested fields can be 
specified using the dot notation eg: `a.b.c` |
+| hoodie.datasource.write.keygenerator.class | N | 
SimpleAvroKeyGenerator.class | Key generator class, that implements will 
extract the key out of incoming record |
+| write.tasks | N | 4 | Parallelism of tasks that do actual write, default is 
4 |
+| write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into 
the underneath filesystem |
+
+If the table type is MERGE_ON_READ, you can also specify the asynchronous 
compaction strategy through options:
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| compaction.async.enabled | N | true | Async Compaction, enabled by default 
for MOR |
+| compaction.trigger.strategy | N | num_commits | Strategy to trigger 
compaction, options are 'num_commits': trigger compaction when reach N delta 
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since 
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and 
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS 
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
+| compaction.delta_commits | N | 5 | Max delta commits needed to trigger 
compaction, default 5 commits |
+| compaction.delta_seconds | N | 3600 | Max delta seconds time needed to 
trigger compaction, default 1 hour |
+
+You can write the data using the SQL `INSERT INTO` statements:
+```sql
+INSERT INTO hudi_table select ... from ...; 
+```
+
+**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
+
+
+### Non-Blocking Concurrency Control (Experimental)
+
+Hudi Flink supports a new non-blocking concurrency control mode, where 
multiple writer tasks can be executed
+concurrently without blocking each other. One can read more about this mode in
+the [concurrency control](/docs/next/concurrency_control#model-c-multi-writer) 
docs. Let us see it in action here.
+
+In the below example, we have two streaming ingestion pipelines that 
concurrently update the same table. One of the
+pipeline is responsible for the compaction and cleaning table services, while 
the other pipeline is just for data
+ingestion.
+
+```sql
+-- set the interval as 30 seconds
+execution.checkpointing.interval: 30000
+state.backend: rocksdb
+
+-- This is a datagen source that can generates records continuously
+CREATE TABLE sourceT (
+    uuid varchar(20),
+    name varchar(10),
+    age int,
+    ts timestamp(3),
+    `partition` as 'par1'
+) WITH (
+    'connector' = 'datagen',
+    'rows-per-second' = '200'
+);
+
+-- pipeline1: by default enable the compaction and cleaning services
+CREATE TABLE t1(
+    uuid varchar(20),
+    name varchar(10),
+    age int,
+    ts timestamp(3),
+    `partition` varchar(20)
+) WITH (
+    'connector' = 'hudi',
+    'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+    'table.type' = 'MERGE_ON_READ',
+    'index.type' = 'BUCKET',
+    'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+    'write.tasks' = '2'
+);
+
+-- pipeline2: disable the compaction and cleaning services manually
+CREATE TABLE t1_2(
+    uuid varchar(20),
+    name varchar(10),
+    age int,
+    ts timestamp(3),
+    `partition` varchar(20)
+) WITH (
+    'connector' = 'hudi',
+    'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
+    'table.type' = 'MERGE_ON_READ',
+    'index.type' = 'BUCKET',
+    'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+    'write.tasks' = '2',
+    'compaction.schedule.enabled' = 'false',
+    'compaction.async.enabled' = 'false',
+    'clean.async.enabled' = 'false'
+);
+
+-- submit the pipelines
+insert into t1 select * from sourceT;
+insert into t1_2 select * from sourceT;
+
+select * from t1 limit 20;
+```
+
+As you can see from the above example, we have two pipelines with multiple 
tasks that concurrently write to the
+same table. To use the new concurrency mode, all you need to do is set the 
`hoodie.write.concurrency.mode`
+to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks` option is used to 
specify the number of write tasks that will
+be used for writing to the table. The `compaction.schedule.enabled`, 
`compaction.async.enabled`
+and `clean.async.enabled` options are used to disable the compaction and 
cleaning services for the second pipeline.
+This is done to ensure that the compaction and cleaning services are not 
executed twice for the same table.
+
+
diff --git a/website/docs/write_operations.md b/website/docs/write_operations.md
index 3146db05802..056d99c5f2b 100644
--- a/website/docs/write_operations.md
+++ b/website/docs/write_operations.md
@@ -6,8 +6,7 @@ last_modified_at:
 ---
 
 It may be helpful to understand the different write operations of Hudi and how 
best to leverage them. These operations
-can be chosen/changed across each commit/deltacommit issued against the table. 
See the [How To docs on Writing Data](/docs/writing_data) 
-to see more examples.
+can be chosen/changed across each commit/deltacommit issued against the table.
 
 ## Operation Types
 ### UPSERT 
@@ -100,7 +99,7 @@ The following is an inside look on the Hudi write path and 
the sequence of event
 6. Update [Index](/docs/next/indexing)
    1. Now that the write is performed, we will go back and update the index.
 7. Commit
-   1. Finally we commit all of these changes atomically. (A [callback 
notification](/docs/next/writing_data#commit-notifications) is exposed)
+   1. Finally we commit all of these changes atomically. ([Post-commit 
callback](/docs/next/platform_services_post_commit_callback) can be configured.)
 8. [Clean](/docs/next/hoodie_cleaner) (if needed)
    1. Following the commit, cleaning is invoked if needed.
 9. [Compaction](/docs/next/compaction)
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index 10226cf9747..582d13bbd06 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -1,27 +1,16 @@
 ---
-title: Writing Data
-keywords: [hudi, incremental, batch, stream, processing, Hive, ETL, Spark SQL]
-summary: In this page, we will discuss some available tools for incrementally 
ingesting & storing data.
-toc: true
-last_modified_at: 2019-12-30T15:59:57-04:00
+title: Batch Writes
+keywords: [hudi, incremental, batch, processing]
+last_modified_at: 2024-03-13T15:59:57-04:00
 ---
 import Tabs from '@theme/Tabs';
 import TabItem from '@theme/TabItem';
 
-In this section, we will cover ways to ingest new changes from external 
sources or even other Hudi tables.
-Currently Hudi supports following ways to write the data.
-- [Hudi Streamer](/docs/hoodie_streaming_ingestion#hudi-streamer)
-- [Spark Hudi Datasource](#spark-datasource-writer)
-- [Spark Structured 
Streaming](/docs/hoodie_streaming_ingestion#structured-streaming)
-- [Spark SQL](/docs/next/sql_ddl#spark-sql)
-- [Flink Writer](/docs/next/hoodie_streaming_ingestion#flink-ingestion)
-- [Flink SQL](/docs/next/sql_ddl#flink)
-- [Java Writer](#java-writer)
-- [Kafka Connect](/docs/next/hoodie_streaming_ingestion#kafka-connect-sink)
+## Spark DataSource API
 
-## Spark Datasource Writer
+The `hudi-spark` module offers the DataSource API to write a Spark DataFrame 
into a Hudi table.
 
-The `hudi-spark` module offers the DataSource API to write (and read) a Spark 
DataFrame into a Hudi table. There are a number of options available:
+There are a number of options available:
 
 **`HoodieWriteConfig`**:
 
@@ -59,7 +48,7 @@ Upsert a DataFrame, specifying the necessary field names for 
`recordKey => _row_
 
 ```java
 inputDF.write()
-       .format("org.apache.hudi")
+       .format("hudi")
        .options(clientOpts) //Where clientOpts is of type Map[String, String]. 
clientOpts can include any other options necessary.
        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"partition")
@@ -99,7 +88,7 @@ You can check the data generated under 
`/tmp/hudi_trips_cow/<region>/<country>/<
 (`uuid` in 
[schema](https://github.com/apache/hudi/blob/6f9b02decb5bb2b83709b1b6ec04a97e4d102c11/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60)),
 partition field (`region/country/city`) and combine logic (`ts` in
 
[schema](https://github.com/apache/hudi/blob/6f9b02decb5bb2b83709b1b6ec04a97e4d102c11/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60))
 to ensure trip records are unique within each partition. For more info, refer 
to
 [Modeling data stored in 
Hudi](/docs/next/faq_general/#how-do-i-model-the-data-stored-in-hudi)
-and for info on ways to ingest data into Hudi, refer to [Writing Hudi 
Tables](/docs/writing_data).
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi 
Tables](/docs/hoodie_streaming_ingestion).
 Here we are using the default write operation : `upsert`. If you have a 
workload without updates, you can also issue
 `insert` or `bulk_insert` operations which could be faster. To know more, 
refer to [Write operations](/docs/write_operations)
 :::
@@ -135,7 +124,7 @@ You can check the data generated under 
`/tmp/hudi_trips_cow/<region>/<country>/<
 (`uuid` in 
[schema](https://github.com/apache/hudi/blob/2e6e302efec2fa848ded4f88a95540ad2adb7798/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60)),
 partition field (`region/country/city`) and combine logic (`ts` in
 
[schema](https://github.com/apache/hudi/blob/2e6e302efec2fa848ded4f88a95540ad2adb7798/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L60))
 to ensure trip records are unique within each partition. For more info, refer 
to
 [Modeling data stored in 
Hudi](/docs/next/faq_general/#how-do-i-model-the-data-stored-in-hudi)
-and for info on ways to ingest data into Hudi, refer to [Writing Hudi 
Tables](/docs/writing_data).
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi 
Tables](/docs/hoodie_streaming_ingestion).
 Here we are using the default write operation : `upsert`. If you have a 
workload without updates, you can also issue
 `insert` or `bulk_insert` operations which could be faster. To know more, 
refer to [Write operations](/docs/write_operations)
 :::
@@ -434,9 +423,9 @@ Then any record you want to delete you can mark 
`_hoodie_is_deleted` as true:
 
 ### Concurrency Control
 
-The `hudi-spark` module offers the DataSource API to write (and read) a Spark 
DataFrame into a Hudi table.
+Following is an example of how to use `optimistic_concurrency_control` via 
Spark DataSource API.
 
-Following is an example of how to use optimistic_concurrency_control via spark 
datasource. Read more in depth about concurrency control in the [concurrency 
control concepts](https://hudi.apache.org/docs/concurrency_control) section  
+Read more in-depth details about concurrency control in the [concurrency 
control concepts](/docs/concurrency_control) section.
 
 ```java
 inputDF.write.format("hudi")
@@ -455,171 +444,5 @@ inputDF.write.format("hudi")
        .save(basePath)
 ```
 
-### Commit Notifications
-Apache Hudi provides the ability to post a callback notification about a write 
commit. This may be valuable if you need
-an event notification stream to take actions with other services after a Hudi 
write commit.
-You can push a write commit callback notification into HTTP endpoints or to a 
Kafka server.
-
-#### HTTP Endpoints
-You can push a commit notification to an HTTP URL and can specify custom 
values by extending a callback class defined below.
-
-|  Config  | Description | Required | Default |
-|  -----------  | -------  | ------- | ------ |
-| TURN_CALLBACK_ON | Turn commit callback on/off | optional | false 
(*callbacks off*) |
-| CALLBACK_HTTP_URL | Callback host to be sent along with callback messages | 
required | N/A |
-| CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional | 
3 |
-| CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of 
HoodieWriteCommitCallback class, 
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default | 
optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
-| CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional | 
hudi_write_commit_http_callback |
-| | | | |
-
-#### Kafka Endpoints
-You can push a commit notification to a Kafka topic so it can be used by other 
real time systems.
-
-|  Config  | Description | Required | Default |
-|  -----------  | -------  | ------- | ------ |
-| TOPIC | Kafka topic name to publish timeline activity into. | required | N/A 
|
-| PARTITION | It may be desirable to serialize all changes into a single Kafka 
partition for providing strict ordering. By default, Kafka messages are keyed 
by table name, which guarantees ordering at the table level, but not globally 
(or when new partitions are added) | required | N/A |
-| RETRIES | Times to retry the produce | optional | 3 |
-| ACKS | kafka acks level, all by default to ensure strong durability | 
optional | all |
-| BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for 
publishing commit metadata | required | N/A |
-| | | | |
-
-#### Pulsar Endpoints
-You can push a commit notification to a Pulsar topic so it can be used by 
other real time systems. 
-
-|  Config  | Description                                                       
          | Required | Default |
-|  -----------  
|-----------------------------------------------------------------------------| 
------- |--------|
-| hoodie.write.commit.callback.pulsar.broker.service.url | Server's Url of 
pulsar cluster to use to publish commit metadata.   | required | N/A    |
-| hoodie.write.commit.callback.pulsar.topic | Pulsar topic name to publish 
timeline activity into | required | N/A    |
-| hoodie.write.commit.callback.pulsar.producer.route-mode | Message routing 
logic for producers on partitioned topics.  | optional |   RoundRobinPartition  
   |
-| hoodie.write.commit.callback.pulsar.producer.pending-queue-size | The 
maximum size of a queue holding pending messages.               | optional | 
1000   |
-| hoodie.write.commit.callback.pulsar.producer.pending-total-size | The 
maximum number of pending messages across partitions. | required | 50000  |
-| hoodie.write.commit.callback.pulsar.producer.block-if-queue-full | When the 
queue is full, the method is blocked instead of an exception is thrown. | 
optional | true   |
-| hoodie.write.commit.callback.pulsar.producer.send-timeout | The timeout in 
each sending to pulsar.     | optional | 30s    |
-| hoodie.write.commit.callback.pulsar.operation-timeout | Duration of waiting 
for completing an operation.     | optional | 30s    |
-| hoodie.write.commit.callback.pulsar.connection-timeout | Duration of waiting 
for a connection to a broker to be established.     | optional | 10s    |
-| hoodie.write.commit.callback.pulsar.request-timeout | Duration of waiting 
for completing a request.  | optional | 60s    |
-| hoodie.write.commit.callback.pulsar.keepalive-interval | Duration of keeping 
alive interval for each client broker connection. | optional | 30s    |
-| |                                                                            
 | |        |
-
-#### Bring your own implementation
-You can extend the HoodieWriteCommitCallback class to implement your own way 
to asynchronously handle the callback
-of a successful write. Use this public API:
-
-https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
-
-## Flink SQL Writer
-The hudi-flink module defines the Flink SQL connector for both hudi source and 
sink.
-There are a number of options available for the sink table:
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| path | Y | N/A | Base path for the target hoodie table. The path would be 
created if it does not exist, otherwise a hudi table expects to be initialized 
successfully |
-| table.type  | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) 
MERGE_ON_READ |
-| write.operation | N | upsert | The write operation, that this write should 
do (insert or upsert is supported) |
-| write.precombine.field | N | ts | Field used in preCombining before actual 
write. When two records have the same key value, we will pick the one with the 
largest value for the precombine field, determined by Object.compareTo(..) |
-| write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload 
class used. Override this, if you like to roll your own merge logic, when 
upserting/inserting. This will render any value set for the option in-effective 
|
-| write.insert.drop.duplicates | N | false | Flag to indicate whether to drop 
duplicates upon insert. By default insert will accept duplicates, to gain extra 
performance |
-| write.ignore.failed | N | true | Flag to indicate whether to ignore any non 
exception error (e.g. writestatus error). within a checkpoint batch. By default 
true (in favor of streaming progressing over data integrity) |
-| hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value 
to be used as the `recordKey` component of `HoodieKey`. Actual value will be 
obtained by invoking .toString() on the field value. Nested fields can be 
specified using the dot notation eg: `a.b.c` |
-| hoodie.datasource.write.keygenerator.class | N | 
SimpleAvroKeyGenerator.class | Key generator class, that implements will 
extract the key out of incoming record |
-| write.tasks | N | 4 | Parallelism of tasks that do actual write, default is 
4 |
-| write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into 
the underneath filesystem |
-
-If the table type is MERGE_ON_READ, you can also specify the asynchronous 
compaction strategy through options:
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| compaction.async.enabled | N | true | Async Compaction, enabled by default 
for MOR |
-| compaction.trigger.strategy | N | num_commits | Strategy to trigger 
compaction, options are 'num_commits': trigger compaction when reach N delta 
commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since 
last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and 
TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS 
or TIME_ELAPSED is satisfied. Default is 'num_commits' |
-| compaction.delta_commits | N | 5 | Max delta commits needed to trigger 
compaction, default 5 commits |
-| compaction.delta_seconds | N | 3600 | Max delta seconds time needed to 
trigger compaction, default 1 hour |
-
-You can write the data using the SQL `INSERT INTO` statements:
-```sql
-INSERT INTO hudi_table select ... from ...; 
-```
-
-**Note**: INSERT OVERWRITE is not supported yet but already on the roadmap.
-
-
-### Non-Blocking Concurrency Control (Experimental)
-
-Hudi Flink supports a new non-blocking concurrency control mode, where 
multiple writer tasks can be executed
-concurrently without blocking each other. One can read more about this mode in
-the [concurrency control](/docs/next/concurrency_control#model-c-multi-writer) 
docs. Let us see it in action here.
-
-In the below example, we have two streaming ingestion pipelines that 
concurrently update the same table. One of the
-pipeline is responsible for the compaction and cleaning table services, while 
the other pipeline is just for data
-ingestion.
-
-```sql
--- set the interval as 30 seconds
-execution.checkpointing.interval: 30000
-state.backend: rocksdb
-
--- This is a datagen source that can generates records continuously
-CREATE TABLE sourceT (
-    uuid varchar(20),
-    name varchar(10),
-    age int,
-    ts timestamp(3),
-    `partition` as 'par1'
-) WITH (
-    'connector' = 'datagen',
-    'rows-per-second' = '200'
-);
-
--- pipeline1: by default enable the compaction and cleaning services
-CREATE TABLE t1(
-    uuid varchar(20),
-    name varchar(10),
-    age int,
-    ts timestamp(3),
-    `partition` varchar(20)
-) WITH (
-    'connector' = 'hudi',
-    'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
-    'table.type' = 'MERGE_ON_READ',
-    'index.type' = 'BUCKET',
-    'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
-    'write.tasks' = '2'
-);
-
--- pipeline2: disable the compaction and cleaning services manually
-CREATE TABLE t1_2(
-    uuid varchar(20),
-    name varchar(10),
-    age int,
-    ts timestamp(3),
-    `partition` varchar(20)
-) WITH (
-    'connector' = 'hudi',
-    'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
-    'table.type' = 'MERGE_ON_READ',
-    'index.type' = 'BUCKET',
-    'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
-    'write.tasks' = '2',
-    'compaction.schedule.enabled' = 'false',
-    'compaction.async.enabled' = 'false',
-    'clean.async.enabled' = 'false'
-);
-
--- submit the pipelines
-insert into t1 select * from sourceT;
-insert into t1_2 select * from sourceT;
-
-select * from t1 limit 20;
-```
-
-As you can see from the above example, we have two pipelines with multiple 
tasks that concurrently write to the
-same table. To use the new concurrency mode, all you need to do is set the 
`hoodie.write.concurrency.mode`
-to `NON_BLOCKING_CONCURRENCY_CONTROL`. The `write.tasks` option is used to 
specify the number of write tasks that will
-be used for writing to the table. The `compaction.schedule.enabled`, 
`compaction.async.enabled`
-and `clean.async.enabled` options are used to disable the compaction and 
cleaning services for the second pipeline.
-This is done to ensure that the compaction and cleaning services are not 
executed twice for the same table.
-
-
-## Java Writer
+## Java Client
 We can use plain java to write to hudi tables. To use Java client we can 
refere 
[here](https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java)
-
diff --git a/website/docs/writing_tables_streaming_writes.md 
b/website/docs/writing_tables_streaming_writes.md
new file mode 100644
index 00000000000..4bb43bb696b
--- /dev/null
+++ b/website/docs/writing_tables_streaming_writes.md
@@ -0,0 +1,92 @@
+---
+title: Streaming Writes
+keywords: [hudi, spark, flink, streaming, processing]
+last_modified_at: 2024-03-13T15:59:57-04:00
+---
+
+## Spark Streaming
+
+You can write Hudi tables using spark's structured streaming.
+
+<Tabs
+groupId="programming-language"
+defaultValue="python"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
diff --git a/website/releases/release-1.0.0-beta1.md 
b/website/releases/release-1.0.0-beta1.md
index f970543feda..e091def2af2 100644
--- a/website/releases/release-1.0.0-beta1.md
+++ b/website/releases/release-1.0.0-beta1.md
@@ -66,7 +66,7 @@ OCC, multiple writers can operate on the table with 
non-blocking conflict resolu
 same file group with the conflicts resolved automatically by the query reader 
and the compactor. The new concurrency
 mode is currently available for preview in version 1.0.0-beta only. You can 
read more about it under
 section [Model C: 
Multi-writer](/docs/next/concurrency_control#non-blocking-concurrency-control-mode-experimental).
 A complete example with multiple 
-Flink streaming writers is available 
[here](/docs/next/writing_data#non-blocking-concurrency-control-experimental). 
You
+Flink streaming writers is available 
[here](/docs/next/sql_dml#non-blocking-concurrency-control-experimental). You
 can follow the 
[RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-66/rfc-66.md) and
 the [JIRA](https://issues.apache.org/jira/browse/HUDI-6640) for more details.
 
diff --git a/website/sidebars.js b/website/sidebars.js
index d99e833edb3..6ef2b9c77a9 100644
--- a/website/sidebars.js
+++ b/website/sidebars.js
@@ -38,19 +38,30 @@ module.exports = {
         },
         {
             type: 'category',
-            label: 'How To',
+            label: 'Ingestion',
             items: [
-                {
-                    type: 'category',
-                    label: 'SQL',
-                    items: [
-                        'sql_ddl',
-                        'sql_dml',
-                        'sql_queries',
-                    ],
-                },
-                'writing_data',
                 'hoodie_streaming_ingestion',
+                'ingestion_flink',
+                'ingestion_kafka_connect',
+            ],
+        },
+        {
+            type: 'category',
+            label: 'Writing Tables',
+            items: [
+                'sql_ddl',
+                'sql_dml',
+                'writing_data',
+                'writing_tables_streaming_writes',
+            ],
+        },
+        {
+            type: 'category',
+            label: 'Reading Tables',
+            items: [
+                'sql_queries',
+                'reading_tables_batch_reads',
+                'reading_tables_streaming_reads',
             ],
         },
         {
@@ -76,6 +87,7 @@ module.exports = {
             items: [
                 'snapshot_exporter',
                 'precommit_validator',
+                'platform_services_post_commit_callback',
                 {
                     type: 'category',
                     label: 'Syncing to Catalogs',
@@ -89,6 +101,20 @@ module.exports = {
                 }
             ],
         },
+        {
+            type: 'category',
+            label: 'Operations',
+            items: [
+                'performance',
+                'deployment',
+                'cli',
+                'metrics',
+                'encryption',
+                'troubleshooting',
+                'tuning-guide',
+                'flink_tuning',
+            ],
+        },
         {
             type: 'category',
             label: 'Configurations',
@@ -113,20 +139,6 @@ module.exports = {
                 },
             ],
         },
-        {
-            type: 'category',
-            label: 'Operations',
-            items: [
-                'performance',
-                'deployment',
-                'cli',
-                'metrics',
-                'encryption',
-                'troubleshooting',
-                'tuning-guide',
-                'flink_tuning',
-            ],
-        },
         {
             type: 'category',
             label: 'Frequently Asked Questions(FAQs)',
@@ -135,7 +147,7 @@ module.exports = {
                 'faq_general',
                 'faq_design_and_concepts',
                 'faq_writing_tables',
-                'faq_querying_tables',
+                'faq_reading_tables',
                 'faq_table_services',
                 'faq_storage',
                 'faq_integrations',
diff --git a/website/versioned_docs/version-0.14.1/faq.md 
b/website/versioned_docs/version-0.14.1/faq.md
index e212008f37c..b3629b4377a 100644
--- a/website/versioned_docs/version-0.14.1/faq.md
+++ b/website/versioned_docs/version-0.14.1/faq.md
@@ -9,7 +9,7 @@ The FAQs are split into following pages. Please refer to the 
specific pages for
 - [General](/docs/next/faq_general)
 - [Design & Concepts](/docs/next/faq_design_and_concepts)
 - [Writing Tables](/docs/next/faq_writing_tables)
-- [Querying Tables](/docs/next/faq_querying_tables)
+- [Querying Tables](/docs/next/faq_reading_tables)
 - [Table Services](/docs/next/faq_table_services)
 - [Storage](/docs/next/faq_storage)
 - [Integrations](/docs/next/faq_integrations)


Reply via email to