danny0405 commented on code in PR #7608:
URL: https://github.com/apache/hudi/pull/7608#discussion_r1063076973


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java:
##########
@@ -88,11 +95,44 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
     
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
         "Option [path] should not be empty.");
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+    mergeTableConfig(conf, schema);
     sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
   }
 
+  /**
+   * fallback pk and pre-combine to table config if not provided
+   */
+  private void mergeTableConfig(Configuration conf, ResolvedSchema schema) {
+    String basePath = conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
+        new ValidationException("Option [path] should not be empty."));
+    Path metaPath = new CachingPath(basePath, METAFOLDER_NAME);
+    FileSystem fileSystem = FSUtils.getFs(metaPath, 
HadoopConfigurations.getHadoopConf(conf));
+    HoodieTableConfig tableConfig;
+    try {
+      tableConfig = new HoodieTableConfig(fileSystem, metaPath.toString(), 
null, null);
+    } catch (HoodieIOException e) {
+      LOG.info("Fail to get table config.", e);
+      return;
+    }
+
+    Map<String, String> propsMap = tableConfig.propsMap();
+    List<String> writeColumnNames = schema.getColumnNames();
+
+    if (!conf.contains(FlinkOptions.RECORD_KEY_FIELD) && 
!schema.getPrimaryKey().isPresent()
+        && propsMap.containsKey(HoodieTableConfig.RECORDKEY_FIELDS.key())
+        && 
writeColumnNames.contains(propsMap.get(HoodieTableConfig.RECORDKEY_FIELDS.key())))
 {
+      conf.set(FlinkOptions.RECORD_KEY_FIELD, 
propsMap.get(HoodieTableConfig.RECORDKEY_FIELDS.key()));

Review Comment:
   The `writeColumnNames.contains` does not work when there are multiple pk 
fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to