This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push: new 4b59d41 [HUDI-2878] enhance hudi-quick-start guide for spark-sql (#4269) 4b59d41 is described below commit 4b59d41e0762ad561f7fa1272859d48a0966dbec Author: Yann Byron <biyan900...@gmail.com> AuthorDate: Fri Dec 10 17:05:46 2021 +0800 [HUDI-2878] enhance hudi-quick-start guide for spark-sql (#4269) --- website/docs/quick-start-guide.md | 417 +++++++++++++++++++++++++++++++------- 1 file changed, 342 insertions(+), 75 deletions(-) diff --git a/website/docs/quick-start-guide.md b/website/docs/quick-start-guide.md index 4abd431..1aa2801 100644 --- a/website/docs/quick-start-guide.md +++ b/website/docs/quick-start-guide.md @@ -31,17 +31,17 @@ From the extracted directory run spark-shell with Hudi as: ```scala // spark-shell for spark 3 spark-shell \ - --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \ + --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' // spark-shell for spark 2 with scala 2.12 spark-shell \ - --packages org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4 \ + --packages org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' // spark-shell for spark 2 with scala 2.11 spark-shell \ - --packages org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4 \ + --packages org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ``` @@ -53,18 +53,18 @@ From the extracted directory run spark-sql with Hudi as: ```shell # spark sql for spark 3 -spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \ +spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' # spark-sql for spark 2 with scala 2.11 -spark-sql --packages org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4 \ +spark-sql --packages org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' # spark-sql for spark 2 with scala 2.12 spark-sql \ - --packages org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4 \ + --packages org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' ``` @@ -80,17 +80,17 @@ export PYSPARK_PYTHON=$(which python3) # for spark3 pyspark ---packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 +--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' # for spark2 with scala 2.12 pyspark ---packages org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4 +--packages org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' # for spark2 with scala 2.11 pyspark ---packages org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4 +--packages org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ``` @@ -100,7 +100,7 @@ pyspark :::note Please note the following <ul> <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li> - <li>spark-avro and spark versions must match (we have used 3.0.1 for both above)</li> + <li>spark-avro and spark versions must match (we have used 3.1.2 for both above)</li> <li>we have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used also depends on 2.12. If spark-avro_2.11 is used, correspondingly hudi-spark-bundle_2.11 needs to be used. </li> </ul> @@ -175,18 +175,161 @@ values={[ </TabItem> <TabItem value="sparksql"> +Spark SQL needs an explicit create table command. + +**Table Concepts** +- Table types: + Both of Hudi's table types (Copy-On-Write (COW) and Merge-On-Read (MOR)) can be created using Spark SQL. + + While creating the table, table type can be specified using **type** option. **type = 'cow'** represents COW table, while **type = 'mor'** represents MOR table. + +- Partitioned & Non-Partitioned table: + Users can create a partitioned table or a non-partitioned table in Spark SQL. + To create a partitioned table, one needs to use **partitioned by** statement to specify the partition columns to create a partitioned table. + When there is no **partitioned by** statement with create table command, table is considered to be a non-partitioned table. + +- Managed & External table: + In general, Spark SQL supports two kinds of tables, namely managed and external. + If one specifies a location using **location** statement or use `create external table` to create table explicitly, it is an external table, else its considered a managed table. + You can read more about external vs managed tables [here](https://sparkbyexamples.com/apache-hive/difference-between-hive-internal-tables-and-external-tables/). + +:::note +1. Since hudi 0.10.0, `primaryKey` is required to specify. It can align with Hudi datasource writer’s and resolve many behavioural discrepancies reported in previous versions. + Non-primaryKey tables are no longer supported. Any hudi table created pre 0.10.0 without a `primaryKey` needs to be recreated with a `primaryKey` field with 0.10.0. + Same as `hoodie.datasource.write.recordkey.field`, hudi use `uuid` as the default primaryKey. So if you want to use `uuid` as your table's `primaryKey`, you can omit the `primaryKey` config in `tblproperties`. +2. `primaryKey`, `preCombineField`, `type` is case sensitive. +3. To specify `primaryKey`, `preCombineField`, `type` or other hudi configs, `tblproperties` is the preferred way than `options`. Spark SQL syntax is detailed here. +4. A new hudi table created by Spark SQL will set `hoodie.table.keygenerator.class` as `org.apache.hudi.keygen.ComplexKeyGenerator`, and +`hoodie.datasource.write.hive_style_partitioning` as `true` by default. +::: + +Let's go over some of the create table commands. + +**Create a Non-Partitioned Table** + ```sql --- -create table if not exists hudi_table2( - id int, - name string, +-- create a cow table, with default primaryKey 'uuid' and without preCombineField provided +create table hudi_cow_nonpcf_tbl ( + uuid int, + name string, price double +) using hudi; + + +-- create a mor non-partitioned table without preCombineField provided +create table hudi_mor_tbl ( + id int, + name string, + price double, + ts bigint ) using hudi -options ( - type = 'cow' +tblproperties ( + type = 'cow', + primaryKey = 'id', + preCombineField = 'ts' ); ``` +Here is an example of creating an external COW partitioned table. + +**Create Partitioned Table** + +```sql +-- create a partitioned, preCombineField-provided cow table +create table hudi_cow_pt_tbl ( + id bigint, + name string, + ts bigint, + dt string, + hh string +) using hudi +tblproperties ( + type = 'cow', + primaryKey = 'id', + preCombineField = 'ts' + ) +partitioned by (dt, hh) +location '/tmp/hudi/hudi_cow_pt_tbl'; +``` + +**Create Table for an existing Hudi Table** + +We can create a table on an existing hudi table(created with spark-shell or deltastreamer). This is useful to +read/write to/from a pre-existing hudi table. + +```sql +-- create an external hudi table based on an existing path + +-- for non-partitioned table +create table hudi_existing_tbl0 using hudi +location 'file:///tmp/hudi/dataframe_hudi_nonpt_table'; + +-- for partitioned table +create table hudi_existing_tbl1 using hudi +partitioned by (dt, hh) +location 'file:///tmp/hudi/dataframe_hudi_pt_table'; +``` + +:::tip +You don't need to specify schema and any properties except the partitioned columns if existed. Hudi can automatically recognize the schema and configurations. +::: + +**CTAS** + +Hudi supports CTAS (Create Table As Select) on spark sql. <br/> +Note: For better performance to load data to hudi table, CTAS uses the **bulk insert** as the write operation. + +Example CTAS command to create a non-partitioned COW table without preCombineField. + +```sql +-- CTAS: create a non-partitioned cow table without preCombineField +create table hudi_ctas_cow_nonpcf_tbl +using hudi +tblproperties (primaryKey = 'id') +as +select 1 as id, 'a1' as name, 10 as price; +``` + +Example CTAS command to create a partitioned, primary key COW table. + +```sql +-- CTAS: create a partitioned, preCombineField-provided cow table +create table hudi_ctas_cow_pt_tbl +using hudi +tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts') +partitioned by (dt) +as +select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt; + +``` + +Example CTAS command to load data from another table. + +```sql +# create managed parquet table +create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet'; + +# CTAS by loading data into hudi table +create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options ( + type = 'cow', + primaryKey = 'id', + preCombineField = 'ts' + ) +partitioned by (datestr) as select * from parquet_mngd; +``` + +**Create Table Properties** + +Users can set table properties while creating a hudi table. Critical options are listed here. + +| Parameter Name | Default | Introduction | +|------------------|--------|------------| +| primaryKey | uuid | The primary key names of the table, multiple fields separated by commas. Same as `hoodie.datasource.write.recordkey.field` | +| preCombineField | | The pre-combine field of the table. Same as `hoodie.datasource.write.precombine.field` | +| type | cow | The table type to create. type = 'cow' means a COPY-ON-WRITE table, while type = 'mor' means a MERGE-ON-READ table. Same as `hoodie.datasource.write.table.type` | + +To set any custom hudi config(like index type, max parquet size, etc), see the "Set hudi config section" . + </TabItem> </Tabs> @@ -268,26 +411,37 @@ Here we are using the default write operation : `upsert`. If you have a workload <TabItem value="sparksql"> ```sql -insert into hudi_table2 select 1, 'a1', 20; +-- insert into non-partitioned table +insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; +insert into hudi_mor_tbl select 1, 'a1', 20, 1000; --- insert static partition -insert into hudi_table2 partition(dt = '2021-01-02') select 1, 'a1'; +-- insert dynamic partition +insert into hudi_cow_pt_tbl partition (dt, hh) +select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh; --- insert overwrite table -insert overwrite table h0 select 1, 'a1', 20; +-- insert static partition +insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000; ``` **NOTICE** +- By default, if `preCombineKey ` is provided, `insert into` use `upsert` as the type of write operation, otherwise use `insert`. +- We support to use `bulk_insert` as the type of write operation, just need to set two configs: `hoodie.sql.bulk.insert.enable` and `hoodie.sql.insert.mode`. Example as follow: -- Insert mode : Hudi supports two insert modes when inserting data to a table with primary key(we call it pk-table as followed):<br/> - Using `strict` mode, insert statement will keep the primary key uniqueness constraint for COW table which do not allow - duplicate records. If a record already exists during insert, a HoodieDuplicateKeyException will be thrown - for COW table. For MOR table, updates are allowed to existing record.<br/> - Using `non-strict` mode, hudi uses the same code path used by `insert` operation in spark data source for the pk-table. <br/> - One can set the insert mode by using the config: **hoodie.sql.insert.mode** - -- Bulk Insert : By default, hudi uses the normal insert operation for insert statements. Users can set **hoodie.sql.bulk.insert.enable** - to true to enable the bulk insert for insert statement. +```sql +-- upsert mode for preCombineField-provided table +insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001; +select id, name, price, ts from hudi_mor_tbl; +1 a1_1 20.0 1001 + +-- bulk_insert mode for preCombineField-provided table +set hoodie.sql.bulk.insert.enable=true; +set hoodie.sql.insert.mode=non-strict; + +insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002; +select id, name, price, ts from hudi_mor_tbl; +1 a1_1 20.0 1001 +1 a1_2 20.0 1002 +``` </TabItem> </Tabs> @@ -447,6 +601,21 @@ denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `d Spark sql supports two kinds of DML to update hudi table: Merge-Into and Update. +### Update +**Syntax** +```sql +UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression] +``` +**Case** +```sql +update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1; + +update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1; +``` +:::note +`Update` operation requires `preCombineField` specified. +::: + ### MergeInto **Syntax** @@ -470,57 +639,34 @@ ON <merge_condition> ``` **Example** ```sql -merge into h0 as target -using ( - select id, name, price, flag from s -) source +-- source table using hudi for testing merging into non-partitioned table +create table merge_source (id int, name string, price double, ts bigint) using hudi +tblproperties (primaryKey = 'id', preCombineField = 'ts'); +insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000); + +merge into hudi_mor_tbl as target +using merge_source as source on target.id = source.id when matched then update set * when not matched then insert * ; -merge into h0 +-- source table using parquet for testing merging into partitioned table +create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet; +insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12'); + +merge into hudi_cow_pt_tbl as target using ( - select id, name, price, flag from s + select id, name, '1000' as ts, flag, dt, hh from merge_source2 ) source -on h0.id = source.id -when matched and flag != 'delete' then update set id = source.id, name = source.name, price = source.price * 2 +on target.id = source.id +when matched and flag != 'delete' then + update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh when matched and flag = 'delete' then delete -when not matched then insert (id,name,price) values(id, name, price) +when not matched then + insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh) ; -``` -**Notice** - -- The merge-on condition can be only on primary keys. Support to merge based on other fields will be added in future. -- Support for partial updates is supported for cow table. -e.g. -```sql - merge into h0 using s0 - on h0.id = s0.id - when matched then update set price = s0.price * 2 -``` -This works well for Cow-On-Write table which supports update based on the **price** field. -For Merge-on-Read table this will be supported in the future. -- Target table's fields cannot be the right-value of the update expression for Merge-On-Read table. -e.g. -```sql - merge into h0 using s0 - on h0.id = s0.id - when matched then update set id = s0.id, - name = h0.name, - price = s0.price + h0.price -``` -This works well for Cow-On-Write table, but not yet supported for Merge-On-Read table. -### Update -**Syntax** -```sql - UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression] -``` -**Case** -```sql - update h0 set price = price + 20 where id = 1; - update h0 set price = price *2, name = 'a2' where id = 2; ``` </TabItem> @@ -579,7 +725,7 @@ val tripsIncrementalDF = spark.read.format("hudi"). tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() -``` +``` </TabItem> <TabItem value="python"> @@ -723,11 +869,13 @@ Only `Append` mode is supported for delete operation. **Syntax** ```sql - DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION] +DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION] ``` **Example** ```sql -delete from h0 where id = 1; +delete from hudi_cow_nonpcf_tbl where uuid = 1; + +delete from hudi_mor_tbl where id % 2 = 0; ``` </TabItem> @@ -779,12 +927,131 @@ Only `Append` mode is supported for delete operation. See the [deletion section](/docs/writing_data#deletes) of the writing data page for more details. +## Insert Overwrite + +Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster +than `upsert` for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally +updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning +steps in the upsert write path completely. + +<Tabs +defaultValue="scala" +values={[ +{ label: 'Scala', value: 'scala', }, +{ label: 'SparkSQL', value: 'sparksql', }, +]}> +<TabItem value="scala"> + +```scala +// spark-shell +spark. + read.format("hudi"). + load(basePath). + select("uuid","partitionpath"). + sort("partitionpath","uuid"). + show(100, false) + +val inserts = convertToStringList(dataGen.generateInserts(10)) +val df = spark. + read.json(spark.sparkContext.parallelize(inserts, 2)). + filter("partitionpath = 'americas/united_states/san_francisco'") +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(OPERATION.key(),"insert_overwrite"). + option(PRECOMBINE_FIELD.key(), "ts"). + option(RECORDKEY_FIELD.key(), "uuid"). + option(PARTITIONPATH_FIELD.key(), "partitionpath"). + option(TBL_NAME.key(), tableName). + mode(Append). + save(basePath) + +// Should have different keys now for San Francisco alone, from query before. +spark. + read.format("hudi"). + load(basePath). + select("uuid","partitionpath"). + sort("partitionpath","uuid"). + show(100, false) +``` +</TabItem> + +<TabItem value="sparksql"> + +`insert overwrite` a partitioned table use the `INSERT_OVERWRITE` type of write operation, while a non-partitioned table to `INSERT_OVERWRITE_TABLE`. + +```sql +-- insert overwrite non-partitioned table +insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900; +insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0; + +-- insert overwrite partitioned table with dynamic partition +insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10'; + +-- insert overwrite partitioned table with static partition +insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100; +``` +</TabItem> +</Tabs> + +## More Spark SQL Commands + +### AlterTable +**Syntax** +```sql +-- Alter table name +ALTER TABLE oldTableName RENAME TO newTableName + +-- Alter table add columns +ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) + +-- Alter table column type +ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType + +-- Alter table properties +ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') +``` +**Examples** +```sql +--rename to: +ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; + +--add column: +ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); + +--change column: +ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; + +--set properties; +alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10'); +``` +### Partition SQL Command +**Syntax** + +```sql +-- Drop Partition +ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] ) + +-- Show Partitions +SHOW PARTITIONS tableIdentifier +``` +**Examples** +```sql +--show partition: +show partitions hudi_cow_pt_tbl; + +--drop partition: +alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10'); +``` +:::note +Currently, the result of `show partitions` is based on the filesystem table path. It's not precise when delete the whole partition data or drop certain partition directly. + +::: ## Where to go from here? You can also do the quickstart by [building hudi yourself](https://github.com/apache/hudi#building-apache-hudi-from-source), and using `--jars <path to hudi_code>/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.1?-*.*.*-SNAPSHOT.jar` in the spark-shell command above -instead of `--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0`. Hudi also supports scala 2.12. Refer [build with scala 2.12](https://github.com/apache/hudi#build-with-scala-212) +instead of `--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0`. Hudi also supports scala 2.12. Refer [build with scala 2.12](https://github.com/apache/hudi#build-with-scala-212) for more info. Also, we used Spark here to show case the capabilities of Hudi. However, Hudi can support multiple table types/query types and