liviazhu commented on code in PR #52375: URL: https://github.com/apache/spark/pull/52375#discussion_r2360524133
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { assert(selectSingleRowDf.count() === 1) } } + + Seq("true", "false").foreach { sideCharPadding => + test(s"SPARK-53625: file metadata in streaming with char type, " + + s"sideCharPadding=$sideCharPadding") { + withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) { + withTempDir { dir => + import scala.jdk.CollectionConverters._ + + val metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)") + .build() + val charSchemaStruct = new StructType() + .add(StructField("char_col", StringType, metadata = metadata)) + + val data: Seq[Row] = Seq(Row("A"), Row("B")) + val df = spark.createDataFrame(data.asJava, charSchemaStruct) + df.coalesce(1).write.format("json") + .save(dir.getCanonicalPath + "/source/new-streaming-data") + + val streamDf = spark.readStream.format("json") + .schema(charSchemaStruct) + .load(dir.getCanonicalPath + "/source/new-streaming-data") + .select("*", "_metadata") + + val streamQuery0 = streamDf + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data") + + streamQuery0.awaitTermination() + assert(streamQuery0.lastProgress.numInputRows == 2L) + + val newDF = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data") + + val sourceFile = new File(dir, "/source/new-streaming-data").listFiles() + .filter(_.getName.endsWith(".json")).head + val sourceFileMetadata = Map( + METADATA_FILE_PATH -> sourceFile.toURI.toString, + METADATA_FILE_NAME -> sourceFile.getName, + METADATA_FILE_SIZE -> sourceFile.length(), + METADATA_FILE_BLOCK_START -> 0, + METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified()) + ) + + // SELECT * will have: char_col, _metadata of /source/new-streaming-data + assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata")) + // Verify the data is expected + checkAnswer( + newDF.select(col("char_col"), + col(METADATA_FILE_PATH), col(METADATA_FILE_NAME), + col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START), + col(METADATA_FILE_BLOCK_LENGTH), + // since we are writing _metadata to a json file, + // we should explicitly cast the column to timestamp type + to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))), + Seq( + Row( + "A", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)), + Row( + "B", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE), Review Comment: Yeah just a sanity check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org