This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 734c9f5 [HUDI-2418] Support HiveSchemaProvider (#3671) 734c9f5 is described below commit 734c9f5f2d6c3ea87c23cef962e69b0b68f72581 Author: 冯健 <fengjian...@gmail.com> AuthorDate: Sun Dec 5 16:10:13 2021 +0800 [HUDI-2418] Support HiveSchemaProvider (#3671) Co-authored-by: jian.feng <fengjian...@gmial.com> --- hudi-utilities/pom.xml | 7 ++ .../hudi/utilities/schema/HiveSchemaProvider.java | 99 ++++++++++++++++ .../functional/TestHiveSchemaProvider.java | 132 +++++++++++++++++++++ ...ClientFunctionalTestHarnessWithHiveSupport.java | 32 +++++ .../hive_schema_provider_source.avsc | 103 ++++++++++++++++ .../hive_schema_provider_target.avsc | 103 ++++++++++++++++ .../schema_registry.source_schema_tab.sql | 12 ++ .../schema_registry.target_schema_tab.sql | 12 ++ 8 files changed, 500 insertions(+) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 2e68039..a487125 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -235,6 +235,13 @@ <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java new file mode 100644 index 0000000..219b1ae --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; + +public class HiveSchemaProvider extends SchemaProvider { + + /** + * Configs supported. + */ + public static class Config { + private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database"; + private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table"; + private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database"; + private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table"; + } + + private static final Logger LOG = LogManager.getLogger(HiveSchemaProvider.class); + + private final Schema sourceSchema; + + private Schema targetSchema; + + public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP)); + String sourceSchemaDBName = props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default"); + String sourceSchemaTableName = props.getString(Config.SOURCE_SCHEMA_TABLE_PROP); + SparkSession spark = SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate(); + try { + TableIdentifier sourceSchemaTable = new TableIdentifier(sourceSchemaTableName, scala.Option.apply(sourceSchemaDBName)); + StructType sourceSchema = spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema(); + + this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema( + sourceSchema, + sourceSchemaTableName, + "hoodie." + sourceSchemaDBName); + + if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) { + String targetSchemaDBName = props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default"); + String targetSchemaTableName = props.getString(Config.TARGET_SCHEMA_TABLE_PROP); + TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDBName)); + StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema(); + this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema( + targetSchema, + targetSchemaTableName, + "hoodie." + targetSchemaDBName); + } + } catch (NoSuchTableException | NoSuchDatabaseException e) { + String message = String.format("Can't find Hive table(s): %s", sourceSchemaTableName + "," + props.getString(Config.TARGET_SCHEMA_TABLE_PROP)); + throw new IllegalArgumentException(message, e); + } + } + + @Override + public Schema getSourceSchema() { + return sourceSchema; + } + + @Override + public Schema getTargetSchema() { + if (targetSchema != null) { + return targetSchema; + } else { + return super.getTargetSchema(); + } + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java new file mode 100644 index 0000000..414ad52 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.schema.HiveSchemaProvider; +import org.apache.hudi.utilities.testutils.SparkClientFunctionalTestHarnessWithHiveSupport; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Basic tests against {@link HiveSchemaProvider}. + */ +@Tag("functional") +public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWithHiveSupport { + private static final Logger LOG = LogManager.getLogger(TestHiveSchemaProvider.class); + private static final TypedProperties PROPS = new TypedProperties(); + private static final String SOURCE_SCHEMA_TABLE_NAME = "schema_registry.source_schema_tab"; + private static final String TARGET_SCHEMA_TABLE_NAME = "schema_registry.target_schema_tab"; + + @BeforeAll + public static void init() { + Pair<String, String> dbAndTableName = paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database", dbAndTableName.getLeft()); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", dbAndTableName.getRight()); + } + + @Test + public void testSourceSchema() throws Exception { + try { + createSchemaTable(SOURCE_SCHEMA_TABLE_NAME); + Schema sourceSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema(); + + Schema originalSchema = new Schema.Parser().parse( + UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_source.avsc") + ); + for (Schema.Field field : sourceSchema.getFields()) { + Schema.Field originalField = originalSchema.getField(field.name()); + assertTrue(originalField != null); + } + } catch (HoodieException e) { + LOG.error("Failed to get source schema. ", e); + throw e; + } + } + + @Test + public void testTargetSchema() throws Exception { + try { + Pair<String, String> dbAndTableName = paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database", dbAndTableName.getLeft()); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table", dbAndTableName.getRight()); + createSchemaTable(SOURCE_SCHEMA_TABLE_NAME); + createSchemaTable(TARGET_SCHEMA_TABLE_NAME); + Schema targetSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getTargetSchema(); + Schema originalSchema = new Schema.Parser().parse( + UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_target.avsc")); + for (Schema.Field field : targetSchema.getFields()) { + Schema.Field originalField = originalSchema.getField(field.name()); + assertTrue(originalField != null); + } + } catch (HoodieException e) { + LOG.error("Failed to get source/target schema. ", e); + throw e; + } + } + + @Test + public void testNotExistTable() { + String wrongName = "wrong_schema_tab"; + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", wrongName); + Assertions.assertThrows(NoSuchTableException.class, () -> { + try { + UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema(); + } catch (Throwable exception) { + while (exception.getCause() != null) { + exception = exception.getCause(); + } + throw exception; + } + }); + } + + private static Pair<String, String> paresDBAndTableName(String fullName) { + String[] dbAndTableName = fullName.split("\\."); + if (dbAndTableName.length > 1) { + return new ImmutablePair<>(dbAndTableName[0], dbAndTableName[1]); + } else { + return new ImmutablePair<>("default", dbAndTableName[0]); + } + } + + private void createSchemaTable(String fullName) throws IOException { + SparkSession spark = spark(); + String createTableSQL = UtilitiesTestBase.Helpers.readFile(String.format("delta-streamer-config/%s.sql", fullName)); + Pair<String, String> dbAndTableName = paresDBAndTableName(fullName); + spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", dbAndTableName.getLeft())); + spark.sql(createTableSQL); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java new file mode 100644 index 0000000..fd59d63 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.testutils; + +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.spark.SparkConf; + +import java.util.Collections; + +public class SparkClientFunctionalTestHarnessWithHiveSupport extends SparkClientFunctionalTestHarness { + + public SparkConf conf() { + return conf(Collections.singletonMap("spark.sql.catalogImplementation", "hive")); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc new file mode 100644 index 0000000..5b1c62b --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "type": "record", + "name": "source_schema_tab", + "namespace": "hoodie.schema_registry", + "fields": [ + { + "name": "id", + "type": [ + "long", + "null" + ] + }, + { + "name": "name", + "type": [ + "string", + "null" + ] + }, + { + "name": "num1", + "type": [ + "int", + "null" + ] + }, + { + "name": "num2", + "type": [ + "long", + "null" + ] + }, + { + "name": "num3", + "type": [ + { + "type": "fixed", + "name": "fixed", + "namespace": "hoodie.schema_registry.source_schema_tab.num3", + "size": 9, + "logicalType": "decimal", + "precision": 20, + "scale": 0 + }, + "null" + ] + }, + { + "name": "num4", + "type": [ + "int", + "null" + ] + }, + { + "name": "num5", + "type": [ + "float", + "null" + ] + }, + { + "name": "num6", + "type": [ + "double", + "null" + ] + }, + { + "name": "bool", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "bin", + "type": [ + "bytes", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc new file mode 100644 index 0000000..d3d95ed --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "type": "record", + "name": "target_schema_tab", + "namespace": "hoodie.schema_registry", + "fields": [ + { + "name": "id", + "type": [ + "long", + "null" + ] + }, + { + "name": "name", + "type": [ + "string", + "null" + ] + }, + { + "name": "num1", + "type": [ + "int", + "null" + ] + }, + { + "name": "num2", + "type": [ + "long", + "null" + ] + }, + { + "name": "num3", + "type": [ + { + "type": "fixed", + "name": "fixed", + "namespace": "hoodie.schema_registry.target_schema_tab.num3", + "size": 9, + "logicalType": "decimal", + "precision": 20, + "scale": 0 + }, + "null" + ] + }, + { + "name": "num4", + "type": [ + "int", + "null" + ] + }, + { + "name": "num5", + "type": [ + "float", + "null" + ] + }, + { + "name": "num6", + "type": [ + "double", + "null" + ] + }, + { + "name": "bool", + "type": [ + "boolean", + "null" + ] + }, + { + "name": "bin", + "type": [ + "bytes", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql new file mode 100644 index 0000000..b95ae0f --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS `schema_registry`.`source_schema_tab`( + `id` BIGINT, + `name` STRING, + `num1` INT, + `num2` BIGINT, + `num3` DECIMAL(20,0), + `num4` TINYINT, + `num5` FLOAT, + `num6` DOUBLE, + `bool` BOOLEAN, + `bin` BINARY +) \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql new file mode 100644 index 0000000..07f179f --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS `schema_registry`.`target_schema_tab`( + `id` BIGINT, + `name` STRING, + `num1` INT, + `num2` BIGINT, + `num3` DECIMAL(20,0), + `num4` TINYINT, + `num5` FLOAT, + `num6` DOUBLE, + `bool` BOOLEAN, + `bin` BINARY +) \ No newline at end of file