KnightChess commented on code in PR #5825:
URL: https://github.com/apache/hudi/pull/5825#discussion_r894120312
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala:
##########
@@ -17,27 +17,59 @@
package org.apache.spark.sql.hudi.command.payload
+import com.google.common.cache.CacheBuilder
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.AvroConversionUtils
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer,
getSqlType}
+import org.apache.spark.sql.types.StructType
+
+import java.util.concurrent.Callable
/**
* A sql typed record which will convert the avro field to sql typed value.
*/
-class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord with
SparkAdapterSupport {
+class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {
- private lazy val sqlType =
AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
- private lazy val avroDeserializer =
sparkAdapter.createAvroDeserializer(record.getSchema, sqlType)
- private lazy val sqlRow =
avroDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+ private lazy val sqlRow =
getAvroDeserializer(getSchema).deserialize(record).get.asInstanceOf[InternalRow]
override def put(i: Int, v: Any): Unit = {
record.put(i, v)
}
override def get(i: Int): AnyRef = {
- sqlRow.get(i, sqlType(i).dataType)
+ sqlRow.get(i, getSqlType(getSchema)(i).dataType)
}
override def getSchema: Schema = record.getSchema
}
+
+object SqlTypedRecord {
+
+ private val sqlTypeCache =
CacheBuilder.newBuilder().maximumSize(16).build[Schema, StructType]()
Review Comment:
16 just the guava cache default size. In batch model, I think only one
source and target schema will be cache, but considering streaming or spark
thrift server( or livy kyuubi), the target or source schema will diff. For long
process task, may be someone can give a appropriate suggestion for it
>I also met this issue recently, but I'm not sure it's a good idea if we
don't have a way to invalidate the schema if we don't use it anymore.
that's why I use LRU cache to cache it, just for long process task. And we
also can set some expiration strategy if its necessary, but I think LRU cache
will enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]