This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 734c9f5  [HUDI-2418] Support HiveSchemaProvider (#3671)
734c9f5 is described below

commit 734c9f5f2d6c3ea87c23cef962e69b0b68f72581
Author: 冯健 <fengjian...@gmail.com>
AuthorDate: Sun Dec 5 16:10:13 2021 +0800

    [HUDI-2418] Support HiveSchemaProvider (#3671)
    
    
    Co-authored-by: jian.feng <fengjian...@gmial.com>
---
 hudi-utilities/pom.xml                             |   7 ++
 .../hudi/utilities/schema/HiveSchemaProvider.java  |  99 ++++++++++++++++
 .../functional/TestHiveSchemaProvider.java         | 132 +++++++++++++++++++++
 ...ClientFunctionalTestHarnessWithHiveSupport.java |  32 +++++
 .../hive_schema_provider_source.avsc               | 103 ++++++++++++++++
 .../hive_schema_provider_target.avsc               | 103 ++++++++++++++++
 .../schema_registry.source_schema_tab.sql          |  12 ++
 .../schema_registry.target_schema_tab.sql          |  12 ++
 8 files changed, 500 insertions(+)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 2e68039..a487125 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -235,6 +235,13 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
new file mode 100644
index 0000000..219b1ae
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+
+public class HiveSchemaProvider extends SchemaProvider {
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private static final String SOURCE_SCHEMA_DATABASE_PROP = 
"hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
+    private static final String SOURCE_SCHEMA_TABLE_PROP = 
"hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
+    private static final String TARGET_SCHEMA_DATABASE_PROP = 
"hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
+    private static final String TARGET_SCHEMA_TABLE_PROP = 
"hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
+  }
+
+  private static final Logger LOG = 
LogManager.getLogger(HiveSchemaProvider.class);
+
+  private final Schema sourceSchema;
+
+  private Schema targetSchema;
+
+  public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP));
+    String sourceSchemaDBName = 
props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default");
+    String sourceSchemaTableName = 
props.getString(Config.SOURCE_SCHEMA_TABLE_PROP);
+    SparkSession spark = 
SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate();
+    try {
+      TableIdentifier sourceSchemaTable = new 
TableIdentifier(sourceSchemaTableName, scala.Option.apply(sourceSchemaDBName));
+      StructType sourceSchema = 
spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
+
+      this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+              sourceSchema,
+              sourceSchemaTableName,
+              "hoodie." + sourceSchemaDBName);
+
+      if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) {
+        String targetSchemaDBName = 
props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default");
+        String targetSchemaTableName = 
props.getString(Config.TARGET_SCHEMA_TABLE_PROP);
+        TableIdentifier targetSchemaTable = new 
TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDBName));
+        StructType targetSchema = 
spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
+        this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+                targetSchema,
+                targetSchemaTableName,
+                "hoodie." + targetSchemaDBName);
+      }
+    } catch (NoSuchTableException | NoSuchDatabaseException e) {
+      String message = String.format("Can't find Hive table(s): %s", 
sourceSchemaTableName + "," + props.getString(Config.TARGET_SCHEMA_TABLE_PROP));
+      throw new IllegalArgumentException(message, e);
+    }
+  }
+
+  @Override
+  public Schema getSourceSchema() {
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (targetSchema != null) {
+      return targetSchema;
+    } else {
+      return super.getTargetSchema();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
new file mode 100644
index 0000000..414ad52
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.HiveSchemaProvider;
+import 
org.apache.hudi.utilities.testutils.SparkClientFunctionalTestHarnessWithHiveSupport;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic tests against {@link HiveSchemaProvider}.
+ */
+@Tag("functional")
+public class TestHiveSchemaProvider extends 
SparkClientFunctionalTestHarnessWithHiveSupport {
+  private static final Logger LOG = 
LogManager.getLogger(TestHiveSchemaProvider.class);
+  private static final TypedProperties PROPS = new TypedProperties();
+  private static final String SOURCE_SCHEMA_TABLE_NAME = 
"schema_registry.source_schema_tab";
+  private static final String TARGET_SCHEMA_TABLE_NAME = 
"schema_registry.target_schema_tab";
+
+  @BeforeAll
+  public static void init() {
+    Pair<String, String> dbAndTableName = 
paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME);
+    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database",
 dbAndTableName.getLeft());
+    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
 dbAndTableName.getRight());
+  }
+
+  @Test
+  public void testSourceSchema() throws Exception {
+    try {
+      createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
+      Schema sourceSchema = 
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, 
jsc()).getSourceSchema();
+
+      Schema originalSchema = new Schema.Parser().parse(
+              
UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_source.avsc")
+      );
+      for (Schema.Field field : sourceSchema.getFields()) {
+        Schema.Field originalField = originalSchema.getField(field.name());
+        assertTrue(originalField != null);
+      }
+    } catch (HoodieException e) {
+      LOG.error("Failed to get source schema. ", e);
+      throw e;
+    }
+  }
+
+  @Test
+  public void testTargetSchema() throws Exception {
+    try {
+      Pair<String, String> dbAndTableName = 
paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME);
+      
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database",
 dbAndTableName.getLeft());
+      
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table",
 dbAndTableName.getRight());
+      createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
+      createSchemaTable(TARGET_SCHEMA_TABLE_NAME);
+      Schema targetSchema = 
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, 
jsc()).getTargetSchema();
+      Schema originalSchema = new Schema.Parser().parse(
+              
UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_target.avsc"));
+      for (Schema.Field field : targetSchema.getFields()) {
+        Schema.Field originalField = originalSchema.getField(field.name());
+        assertTrue(originalField != null);
+      }
+    } catch (HoodieException e) {
+      LOG.error("Failed to get source/target schema. ", e);
+      throw e;
+    }
+  }
+
+  @Test
+  public void testNotExistTable() {
+    String wrongName = "wrong_schema_tab";
+    
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
 wrongName);
+    Assertions.assertThrows(NoSuchTableException.class, () -> {
+      try {
+        UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), 
PROPS, jsc()).getSourceSchema();
+      } catch (Throwable exception) {
+        while (exception.getCause() != null) {
+          exception = exception.getCause();
+        }
+        throw exception;
+      }
+    });
+  }
+
+  private static Pair<String, String> paresDBAndTableName(String fullName) {
+    String[] dbAndTableName = fullName.split("\\.");
+    if (dbAndTableName.length > 1) {
+      return new ImmutablePair<>(dbAndTableName[0], dbAndTableName[1]);
+    } else {
+      return new ImmutablePair<>("default", dbAndTableName[0]);
+    }
+  }
+
+  private void createSchemaTable(String fullName) throws IOException {
+    SparkSession spark = spark();
+    String createTableSQL = 
UtilitiesTestBase.Helpers.readFile(String.format("delta-streamer-config/%s.sql",
 fullName));
+    Pair<String, String> dbAndTableName = paresDBAndTableName(fullName);
+    spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", 
dbAndTableName.getLeft()));
+    spark.sql(createTableSQL);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java
new file mode 100644
index 0000000..fd59d63
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.SparkConf;
+
+import java.util.Collections;
+
+public class SparkClientFunctionalTestHarnessWithHiveSupport extends 
SparkClientFunctionalTestHarness {
+
+  public SparkConf conf() {
+    return conf(Collections.singletonMap("spark.sql.catalogImplementation", 
"hive"));
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc
 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc
new file mode 100644
index 0000000..5b1c62b
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "type": "record",
+  "name": "source_schema_tab",
+  "namespace": "hoodie.schema_registry",
+  "fields": [
+    {
+      "name": "id",
+      "type": [
+        "long",
+        "null"
+      ]
+    },
+    {
+      "name": "name",
+      "type": [
+        "string",
+        "null"
+      ]
+    },
+    {
+      "name": "num1",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "num2",
+      "type": [
+        "long",
+        "null"
+      ]
+    },
+    {
+      "name": "num3",
+      "type": [
+        {
+          "type": "fixed",
+          "name": "fixed",
+          "namespace": "hoodie.schema_registry.source_schema_tab.num3",
+          "size": 9,
+          "logicalType": "decimal",
+          "precision": 20,
+          "scale": 0
+        },
+        "null"
+      ]
+    },
+    {
+      "name": "num4",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "num5",
+      "type": [
+        "float",
+        "null"
+      ]
+    },
+    {
+      "name": "num6",
+      "type": [
+        "double",
+        "null"
+      ]
+    },
+    {
+      "name": "bool",
+      "type": [
+        "boolean",
+        "null"
+      ]
+    },
+    {
+      "name": "bin",
+      "type": [
+        "bytes",
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc
 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc
new file mode 100644
index 0000000..d3d95ed
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "type": "record",
+  "name": "target_schema_tab",
+  "namespace": "hoodie.schema_registry",
+  "fields": [
+    {
+      "name": "id",
+      "type": [
+        "long",
+        "null"
+      ]
+    },
+    {
+      "name": "name",
+      "type": [
+        "string",
+        "null"
+      ]
+    },
+    {
+      "name": "num1",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "num2",
+      "type": [
+        "long",
+        "null"
+      ]
+    },
+    {
+      "name": "num3",
+      "type": [
+        {
+          "type": "fixed",
+          "name": "fixed",
+          "namespace": "hoodie.schema_registry.target_schema_tab.num3",
+          "size": 9,
+          "logicalType": "decimal",
+          "precision": 20,
+          "scale": 0
+        },
+        "null"
+      ]
+    },
+    {
+      "name": "num4",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "num5",
+      "type": [
+        "float",
+        "null"
+      ]
+    },
+    {
+      "name": "num6",
+      "type": [
+        "double",
+        "null"
+      ]
+    },
+    {
+      "name": "bool",
+      "type": [
+        "boolean",
+        "null"
+      ]
+    },
+    {
+      "name": "bin",
+      "type": [
+        "bytes",
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql
 
b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql
new file mode 100644
index 0000000..b95ae0f
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS `schema_registry`.`source_schema_tab`(
+    `id` BIGINT,
+    `name` STRING,
+    `num1` INT,
+    `num2` BIGINT,
+    `num3` DECIMAL(20,0),
+    `num4` TINYINT,
+    `num5` FLOAT,
+    `num6` DOUBLE,
+    `bool` BOOLEAN,
+    `bin` BINARY
+)
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql
 
b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql
new file mode 100644
index 0000000..07f179f
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS `schema_registry`.`target_schema_tab`(
+    `id` BIGINT,
+    `name` STRING,
+    `num1` INT,
+    `num2` BIGINT,
+    `num3` DECIMAL(20,0),
+    `num4` TINYINT,
+    `num5` FLOAT,
+    `num6` DOUBLE,
+    `bool` BOOLEAN,
+    `bin` BINARY
+)
\ No newline at end of file

Reply via email to