alexeykudinkin commented on code in PR #7714:
URL: https://github.com/apache/hudi/pull/7714#discussion_r1082080586
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -210,6 +220,26 @@ object HoodieInternalRowUtils {
}
}
+ def getCachedAvroSerializer(schema: Schema): HoodieAvroSerializer = {
+ val avroSerializerMap = avroSerializerMapThreadLocal.get()
+ if (!avroSerializerMap.containsKey(schema)) {
Review Comment:
Lookups w/ Avro schema as keys are costly -- let's minimize number of
lookups we need to make:
- We can use `getOrElseUpdate` to get/update in one lookup
- We can store pair (StructType, SerDe) to avoid the need to do 2 lookups
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -192,16 +198,25 @@ public boolean hasNext() {
}
@Override
- public IndexedRecord next() {
+ public HoodieRecord next() {
try {
int recordLength = this.dis.readInt();
BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(this.content,
this.dis.getNumberOfBytesRead(),
recordLength, this.decoderCache.get());
this.decoderCache.set(decoder);
- IndexedRecord record = this.reader.read(null, decoder);
+ IndexedRecord indexedRecord = this.reader.read(null, decoder);
this.dis.skipBytes(recordLength);
this.readRecords++;
- return record;
+ if (recordType == HoodieRecordType.SPARK) {
Review Comment:
We should hide this w/in the reader
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -261,7 +281,16 @@ public static HoodieAvroDataBlock getBlock(byte[] content,
Schema readerSchema,
int recordLength = dis.readInt();
Decoder decoder = DecoderFactory.get().binaryDecoder(content,
dis.getNumberOfBytesRead(), recordLength, null);
IndexedRecord record = reader.read(null, decoder);
- records.add(new HoodieAvroIndexedRecord(record));
+ if (recordType == HoodieRecordType.SPARK) {
Review Comment:
Same comment regarding reader
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -329,8 +329,12 @@ object HoodieSparkSqlWriter {
DataSourceUtils.createHoodieClient(jsc, dataFileSchemaStr, path,
tblName, mapAsJavaMap(finalOpts))
}.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
val writeConfig = client.getConfig
- if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
- throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
+ val logBlockType =
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK)
Review Comment:
We should leverage `pickLogDataBlockFormat` from `AppendHandle`
- We should move it to `WriteConfig`
- Make it accept `BaseFileFormat`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -329,8 +329,12 @@ object HoodieSparkSqlWriter {
DataSourceUtils.createHoodieClient(jsc, dataFileSchemaStr, path,
tblName, mapAsJavaMap(finalOpts))
}.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
val writeConfig = client.getConfig
- if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
- throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
+ val logBlockType =
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK)
+ if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK
Review Comment:
Let's extract this to a standalone method `validateFileFormatsConfig`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]