Hello Friends, I’m working on a performance improvement that reads additional parquet files in the middle of a lambda and I’m running into some issues. This is what id like todo
ds.mapPartitions(x=>{ //read parquet file in and perform an operation with x }) Here’s my current POC code but I’m getting nonsense back from the row reader. import com.amazon.horizon.azulene.util.SparkFileUtils._ spark.conf.set("spark.sql.parquet.enableVectorizedReader","false") val data = List( TestRow(1,1,"asdf"), TestRow(2,1,"asdf"), TestRow(3,1,"asdf"), TestRow(4,1,"asdf") ) val df = spark.createDataFrame(data) val folder = Files.createTempDirectory("azulene-test") val folderPath = folder.toAbsolutePath.toString + "/" df.write.mode("overwrite").parquet(folderPath) val files = spark.fs.listStatus(folder.toUri) val file = files(1)//skip _success file val partitionSchema = StructType(Seq.empty) val dataSchema = df.schema val fileFormat = new ParquetFileFormat() val path = file.getPath val status = spark.fs.getFileStatus(path) val pFile = new PartitionedFile( partitionValues = InternalRow.empty,//This should be empty for non partitioned values filePath = path.toString, start = 0, length = status.getLen ) val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow] fileFormat.buildReaderWithPartitionValues( sparkSession = spark, dataSchema = dataSchema, partitionSchema = partitionSchema,//this should be empty for non partitioned feilds requiredSchema = dataSchema, filters = Seq.empty, options = Map.empty, hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) ) import scala.collection.JavaConverters._ val rows = readFile(pFile).flatMap(_ match { case r: InternalRow => Seq(r) // This doesn't work. vector mode is doing something screwy case b: ColumnarBatch => b.rowIterator().asScala }).toList println(rows) //List([0,1,5b,2000000004,66647361]) //??this is wrong I think???? Has anyone attempted something similar? Cheers Andrew