liviazhu commented on code in PR #52375:
URL: https://github.com/apache/spark/pull/52375#discussion_r2360524133


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       assert(selectSingleRowDf.count() === 1)
     }
   }
+
+  Seq("true", "false").foreach { sideCharPadding =>
+    test(s"SPARK-53625: file metadata in streaming with char type, " +
+      s"sideCharPadding=$sideCharPadding") {
+      withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
+        withTempDir { dir =>
+          import scala.jdk.CollectionConverters._
+
+          val metadata = new MetadataBuilder()
+            .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
+            .build()
+          val charSchemaStruct = new StructType()
+            .add(StructField("char_col", StringType, metadata = metadata))
+
+          val data: Seq[Row] = Seq(Row("A"), Row("B"))
+          val df = spark.createDataFrame(data.asJava, charSchemaStruct)
+          df.coalesce(1).write.format("json")
+            .save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+          val streamDf = spark.readStream.format("json")
+            .schema(charSchemaStruct)
+            .load(dir.getCanonicalPath + "/source/new-streaming-data")
+            .select("*", "_metadata")
+
+          val streamQuery0 = streamDf
+            .writeStream.format("json")
+            .option("checkpointLocation", dir.getCanonicalPath + 
"/target/checkpoint")
+            .trigger(Trigger.AvailableNow())
+            .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          streamQuery0.awaitTermination()
+          assert(streamQuery0.lastProgress.numInputRows == 2L)
+
+          val newDF = spark.read.format("json")
+            .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          val sourceFile = new File(dir, 
"/source/new-streaming-data").listFiles()
+            .filter(_.getName.endsWith(".json")).head
+          val sourceFileMetadata = Map(
+            METADATA_FILE_PATH -> sourceFile.toURI.toString,
+            METADATA_FILE_NAME -> sourceFile.getName,
+            METADATA_FILE_SIZE -> sourceFile.length(),
+            METADATA_FILE_BLOCK_START -> 0,
+            METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
+            METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(sourceFile.lastModified())
+          )
+
+          // SELECT * will have: char_col, _metadata of 
/source/new-streaming-data
+          assert(newDF.select("*").columns.toSet == Set("char_col", 
"_metadata"))
+          // Verify the data is expected
+          checkAnswer(
+            newDF.select(col("char_col"),
+              col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
+              col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START),
+              col(METADATA_FILE_BLOCK_LENGTH),
+              // since we are writing _metadata to a json file,
+              // we should explicitly cast the column to timestamp type
+              to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
+            Seq(
+              Row(
+                "A",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
+              Row(
+                "B",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
+            )
+          )
+
+          checkAnswer(
+            newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),

Review Comment:
   Yeah just a sanity check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to