rahil-c commented on code in PR #18098:
URL: https://github.com/apache/hudi/pull/18098#discussion_r2956620746
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala:
##########
@@ -1996,4 +1997,105 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
HoodieSparkSqlTestBase.enableComplexKeygenValidation(spark, tableName)
checkAnswer(query)(expectedRowsAfter: _*)
}
+
+ test("test create table with BLOB column") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | video BLOB COMMENT 'Product demonstration video'
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ // Verify schema has hudi_blob metadata
+ val schema = spark.table(tableName).schema
+ val videoField = schema.find(_.name == "video").get
+
assertTrue(videoField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
videoField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals("Product demonstration video",
videoField.metadata.getString("comment"))
+
+ // Verify structure matches blob schema
+ assertTrue(videoField.dataType.isInstanceOf[StructType])
+ assertEquals(BlobType(), videoField.dataType)
+ }
+ }
+
+ test("test create table with multiple BLOB columns") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | video BLOB,
+ | thumbnail blob,
+ | metadata MAP<STRING, STRING>,
+ | audio BLOB NOT NULL
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ val schema = spark.table(tableName).schema
+
+ // Verify all BLOB columns have the metadata
+ val blobColumns = Seq("video", "thumbnail", "audio")
+ blobColumns.foreach { colName =>
+ val field = schema.find(_.name == colName).get
+ assertTrue(field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ assertTrue(field.dataType.isInstanceOf[StructType])
+
+ if (colName == "audio") {
+ assertFalse(field.nullable)
+ } else {
+ assertTrue(field.nullable)
+ }
+
+ val blobStruct = field.dataType.asInstanceOf[StructType]
+ assertEquals(BlobType(), blobStruct)
+ }
+ }
+ }
+
+ test("test BLOB in nested struct") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id BIGINT,
+ | media STRUCT<title: STRING, content: BLOB>
+ |) USING hudi
+ |LOCATION '${tmp.getCanonicalPath}'
+ |TBLPROPERTIES (
+ | primaryKey = 'id'
+ |)
+ """.stripMargin)
+
+ val schema = spark.table(tableName).schema
+ val mediaField = schema.find(_.name == "media").get
+ assertTrue(mediaField.dataType.isInstanceOf[StructType])
+
+ val mediaStruct = mediaField.dataType.asInstanceOf[StructType]
+ val contentField = mediaStruct.find(_.name == "content").get
+
+ // Verify nested BLOB has metadata
+
assertTrue(contentField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD))
+ assertEquals(HoodieSchemaType.BLOB.name(),
contentField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+ // Verify structure
+ assertTrue(contentField.dataType.isInstanceOf[StructType])
+ val blobStruct = contentField.dataType.asInstanceOf[StructType]
+ assertEquals(BlobType(), blobStruct)
+ }
+ }
Review Comment:
@the-other-tim-brown I think i was also seeing issues when i added tests
like this where i would use the `blob_read` in a `WHERE` clause
```
@Test
def testReadBlobInWhereClauseShouldFail(): Unit = {
val filePath = createTestFile(tempDir, "where.bin", 10000)
val df = sparkSession.createDataFrame(Seq(
(1, filePath, 0L, 100L),
(2, filePath, 100L, 100L),
(3, filePath, 200L, 100L)
)).toDF("id", "external_path", "offset", "length")
.withColumn("file_info",
blobStructCol("file_info", col("external_path"), col("offset"),
col("length")))
.select("id", "file_info")
df.createOrReplaceTempView("where_table")
// read_blob() in WHERE clause: ReadBlobRule only matches Project nodes,
// so the ReadBlobExpression in the Filter node is never resolved.
// Since ReadBlobExpression is Unevaluable, this should either:
// (a) fail with a clear error during analysis, or
// (b) work correctly
// Currently it crashes at code generation with an opaque error.
val ex = assertThrows(classOf[Exception], () => {
sparkSession.sql("""
SELECT id
FROM where_table
WHERE length(read_blob(file_info)) > 50
""").collect()
})
// Demonstrates the bug: error message is unhelpful.
// Should either be supported or produce a clear analysis error like:
// "read_blob() can only be used in SELECT projections, not in
WHERE/HAVING/ORDER BY"
assertTrue(
ex.getMessage.contains("Unevaluable") ||
ex.getMessage.contains("cannot be evaluated"),
s"Expected Unevaluable error but got: ${ex.getMessage}"
)
}
```
Was hitting the
```
Cannot generate code for expression: read_blob(input[1,
struct<type:string,data:binary,reference:struct<external_path:string,offset:bigint,length:bigint,managed:boolean>>,
false]) ==>
```
Would this also happen on a `HAVING` query?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]