yihua commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r927107467


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {
 
   /**
    * Collection of meta-fields as defined by {@link 
HoodieRecord#HOODIE_META_COLUMNS}
+   *
+   * NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields 
even in case
+   *       when source row also contains them, to make sure these fields are 
mutable and
+   *       can be updated (for ex, {@link UnsafeRow} doesn't support mutations 
due to
+   *       its memory layout, as it persists field offsets)
    */
   private final UTF8String[] metaFields;
-  private final InternalRow row;
+  private final InternalRow sourceRow;
 
   /**
-   * Specifies whether source {@link #row} contains meta-fields
+   * Specifies whether source {@link #sourceRow} contains meta-fields
    */
-  private final boolean containsMetaFields;
+  private final boolean sourceContainsMetaFields;

Review Comment:
   Are these renaming necessary in this PR?  Next time, I would prefer to have 
them in a separate PR to avoid reviewing overhead.



##########
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java:
##########
@@ -174,7 +174,9 @@ public static Configuration 
getDefaultHadoopConf(KafkaConnectConfigs connectConf
    * @return Returns the record key columns separated by comma.
    */
   public static String getRecordKeyColumns(KeyGenerator keyGenerator) {
-    return String.join(",", keyGenerator.getRecordKeyFieldNames());
+    return keyGenerator.getRecordKeyFieldNames().stream()
+        .map(HoodieAvroUtils::getRootLevelFieldName)

Review Comment:
   What is the reason to get root-level fields for Kafka Connect?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = 
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    // TODO avoid conversion to avro
-    //      since converterFn is transient this will be repeatedly initialized 
over and over again
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
-    }
-    return getKey(converterFn.apply(row)).getRecordKey();
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
   }
 
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    try {
-      // TODO fix
-      buildFieldSchemaInfoIfNeeded(schema);
-      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, 
getRecordKeyFields(), recordKeySchemaInfo, false);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with 
exception", e);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return 
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
   }
+
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler 
to apply certain
+   *       optimizations, like inlining)
    */
+  protected final String combinePartitionPath(Object... partitionPathParts) {

Review Comment:
   Should these util methods be static?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java:
##########
@@ -79,4 +83,12 @@ public long endTimer() {
     }
     return timeInfoDeque.pop().stop();
   }
+
+  public static HoodieTimer start() {

Review Comment:
   +1



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java:
##########
@@ -42,32 +41,34 @@ public class HoodieInternalRowFileWriterFactory {
    * Factory method to assist in instantiating an instance of {@link 
HoodieInternalRowFileWriter}.
    * @param path path of the RowFileWriter.
    * @param hoodieTable instance of {@link HoodieTable} in use.
-   * @param config instance of {@link HoodieWriteConfig} to use.
+   * @param writeConfig instance of {@link HoodieWriteConfig} to use.
    * @param schema schema of the dataset in use.
    * @return the instantiated {@link HoodieInternalRowFileWriter}.
    * @throws IOException if format is not supported or if any exception during 
instantiating the RowFileWriter.
    *
    */
-  public static HoodieInternalRowFileWriter getInternalRowFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType 
schema)
+  public static HoodieInternalRowFileWriter getInternalRowFileWriter(Path path,
+                                                                     
HoodieTable hoodieTable,
+                                                                     
HoodieWriteConfig writeConfig,
+                                                                     
StructType schema)
       throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetInternalRowFileWriter(path, config, schema, 
hoodieTable);
+      return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, 
schema, tryInstantiateBloomFilter(writeConfig));
     }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
-  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
-      Path path, HoodieWriteConfig writeConfig, StructType structType, 
HoodieTable table)
+  private static HoodieInternalRowFileWriter 
newParquetInternalRowFileWriter(Path path,
+                                                                             
HoodieTable table,
+                                                                             
HoodieWriteConfig writeConfig,
+                                                                             
StructType structType,
+                                                                             
Option<BloomFilter> bloomFilterOpt
+  )
       throws IOException {
-    BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());

Review Comment:
   Why not try to instantiate the bloom filter here instead since write config 
is passed in, to avoid unnecessary changes?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,20 +18,65 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing 
implementation to
+ * specifically implement record-key, partition-path generation w/o the need 
for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
  */
 public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
 
+  /**
+   * Extracts record key from Spark's {@link Row}
+   *
+   * @param row instance of {@link Row} from which record-key is extracted
+   * @return record's (primary) key
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   String getRecordKey(Row row);
 
-  String getRecordKey(InternalRow row, StructType schema);
+  /**
+   * Extracts record key from Spark's {@link InternalRow}
+   *
+   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link 
InternalRow} could
+   *       internally hold just a binary representation of the data, while 
{@link Row} has it
+   *       deserialized into JVM-native representation (like {@code Integer}, 
{@code Long},
+   *       {@code String}, etc)
+   *
+   * @param row instance of {@link InternalRow} from which record-key is 
extracted
+   * @param schema schema {@link InternalRow} is adhering to
+   * @return record-key as instance of {@link UTF8String}
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  UTF8String getRecordKey(InternalRow row, StructType schema);

Review Comment:
   We need to call out breaking public API changes in release notes and docs 
(in this case the changes are unavoidable).  @alexeykudinkin could you create a 
Jira ticket for that?  cc @codope @xushiyan 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = 
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)

Review Comment:
   Why removing the annotation?  already reflected in the interface?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link 
SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum 
performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should 
override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements 
SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new 
AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = 
LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = 
UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = 
new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo 
= new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    // TODO avoid conversion to avro
-    //      since converterFn is transient this will be repeatedly initialized 
over and over again
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
-    }
-    return getKey(converterFn.apply(row)).getRecordKey();
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
   }
 
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    try {
-      // TODO fix
-      buildFieldSchemaInfoIfNeeded(schema);
-      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, 
getRecordKeyFields(), recordKeySchemaInfo, false);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with 
exception", e);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return 
UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType 
schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and 
has to be overridden
+    //       to provide for optimal performance on Spark. This implementation 
provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {

Review Comment:
   Should the order be the opposite, i.e., first lock then check and assign?  
Otherwise, in a slight chance, the `rowAccessor` is going to be assigned twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to