Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1134248086


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##########
@@ -97,7 +102,54 @@ public class HiveTableUtil {
 
     private HiveTableUtil() {}
 
-    public static TableSchema createTableSchema(
+    public static org.apache.flink.table.api.Schema createSchema(
+            HiveConf hiveConf,
+            Table hiveTable,
+            HiveMetastoreClientWrapper client,
+            HiveShim hiveShim) {
+        List<FieldSchema> fields = getNonPartitionFields(hiveConf, hiveTable, 
hiveShim);
+
+        Set<String> notNullColumns =
+                client.getNotNullColumns(hiveConf, hiveTable.getDbName(), 
hiveTable.getTableName());
+        UniqueConstraint primaryKey =
+                client.getPrimaryKey(
+                                hiveTable.getDbName(),
+                                hiveTable.getTableName(),
+                                HiveTableUtil.relyConstraint((byte) 0))
+                        .orElse(null);
+
+        return createSchema(fields, hiveTable.getPartitionKeys(), 
notNullColumns, primaryKey);
+    }
+
+    /** Create a Flink's Schema from Hive table's columns and partition keys. 
*/
+    public static org.apache.flink.table.api.Schema createSchema(
+            List<FieldSchema> nonPartCols,
+            List<FieldSchema> partitionKeys,
+            Set<String> notNullColumns,
+            @Nullable UniqueConstraint primaryKey) {
+        List<FieldSchema> allCols = new ArrayList<>(nonPartCols);
+        allCols.addAll(partitionKeys);
+
+        // PK columns cannot be null
+        if (primaryKey != null) {
+            notNullColumns.addAll(primaryKey.getColumns());
+        }
+
+        Tuple2<String[], DataType[]> columnInformation =
+                extractColumnInformation(allCols, notNullColumns);
+
+        org.apache.flink.table.api.Schema.Builder builder =
+                org.apache.flink.table.api.Schema.newBuilder()
+                        .fromFields(columnInformation.f0, 
columnInformation.f1);
+        if (primaryKey != null) {
+            builder.primaryKeyNamed(
+                    primaryKey.getName(), primaryKey.getColumns().toArray(new 
String[0]));
+        }
+        return builder.build();
+    }
+
+    /** Create the Hive table's row type. */
+    public static DataType extractRowType(

Review Comment:
   extract a new method `extractHiveTableInfo`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to