bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280257096
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##########
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+       // Prefix used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+       // Flink tables should be stored as 'external' tables in Hive metastore
+       private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new 
HashMap<String, String>() {{
+               put("EXTERNAL", "TRUE");
+       }};
+
        private GenericHiveMetastoreCatalogUtil() {
        }
 
        // ------ Utils ------
 
        /**
-        * Creates a Hive database from CatalogDatabase.
+        * Creates a Hive database from a CatalogDatabase.
+        *
+        * @param databaseName name of the database
+        * @param catalogDatabase the CatalogDatabase instance
+        * @return a Hive database
         */
-       public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-               Map<String, String> props = db.getProperties();
+       public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
                return new Database(
-                       dbName,
-                       db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+                       databaseName,
+                       catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
                        null,
-                       props);
+                       catalogDatabase.getProperties());
+       }
+
+       /**
+        * Creates a Hive table from a CatalogBaseTable.
+        *
+        * @param tablePath path of the table
+        * @param table the CatalogBaseTable instance
+        * @return a Hive table
+        */
+       public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+               Map<String, String> properties = new 
HashMap<>(table.getProperties());
+
+               // Table description
+               if (table.getDescription().isPresent()) {
+                       properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+               }
+
+               Table hiveTable = new Table();
+               hiveTable.setDbName(tablePath.getDatabaseName());
+               hiveTable.setTableName(tablePath.getObjectName());
+               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+               // Table properties
+               hiveTable.setParameters(buildFlinkProperties(properties));
+               hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+               // Hive table's StorageDescriptor
+               StorageDescriptor sd = new StorageDescriptor();
+               sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+               List<FieldSchema> allColumns = 
createHiveColumns(table.getSchema());
+
+               // Table columns and partition keys
+               CatalogTable catalogTable = (CatalogTable) table;
+
+               if (catalogTable.isPartitioned()) {
+                       int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+                       List<FieldSchema> regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+                       List<FieldSchema> partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+                       sd.setCols(regularColumns);
+                       hiveTable.setPartitionKeys(partitionColumns);
+               } else {
+                       sd.setCols(allColumns);
+                       hiveTable.setPartitionKeys(new ArrayList<>());
+               }
+
+               hiveTable.setSd(sd);
+
+               if (table instanceof CatalogTable) {
+                       // TODO: [FLINK-12240] Support view related operations 
in GenericHiveMetastoreCatalog
+                       throw new UnsupportedOperationException();
+               }
+
+               return hiveTable;
+       }
+
+       /**
+        * Creates a CatalogBaseTable from a Hive table.
+        *
+        * @param hiveTable the Hive table
+        * @return a CatalogBaseTable
+        */
+       public static CatalogBaseTable createCatalogTable(Table hiveTable) {
+               // Table schema
+               TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
+                               hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
+
+               // Table properties
+               Map<String, String> properties = 
retrieveFlinkProperties(hiveTable.getParameters());
+
+               // Table description
+               String description = 
properties.remove(HiveTableConfig.TABLE_DESCRITPION);
+
+               // Partition keys
+               List<String> partitionKeys = new ArrayList<>();
+
+               if (hiveTable.getPartitionKeys() != null && 
hiveTable.getPartitionKeys().isEmpty()) {
+                       partitionKeys = hiveTable.getPartitionKeys().stream()
+                                                               .map(fs -> 
fs.getName())
+                                                               
.collect(Collectors.toList());
+               }
+
+               // TODO: remove table stats from CatalogTable
+               return new GenericCatalogTable(
+                       tableSchema, new TableStats(0), partitionKeys, 
properties, description);
+       }
+
+       /**
+        * Create Hive columns from Flink TableSchema.
+        */
+       private static List<FieldSchema> createHiveColumns(TableSchema schema) {
+               String[] fieldNames = schema.getFieldNames();
+               TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+               List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       columns.add(
+                               new FieldSchema(fieldNames[i], 
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+               }
+
+               return columns;
+       }
+
+       /**
+        * Filter out Hive-created properties, and return Flink-created 
properties.
+        */
+       private static Map<String, String> retrieveFlinkProperties(Map<String, 
String> hiveTableParams) {
 
 Review comment:
   I'd rather not to mix flink properties with hive's, otherwise it will be 
very hard to keep them in sync considering the hive's properties are different 
among versions.
   
   I do agree we can add more generic properties, such as last modified time, 
for Flink tables, but I believe it needs to be on its own and not depends on 
hive - e.g. we define Flink's own config for last modified time and decide how 
to store it. That can be on a different JIRA. What do you think? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to