jonvex commented on code in PR #12001:
URL: https://github.com/apache/hudi/pull/12001#discussion_r1775418456


##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -316,6 +316,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Initializes the metadata table by reading from the 
file system when the table is first created. Enabled by default. "
           + "Warning: This should only be disabled when manually constructing 
the metadata table outside of typical Hudi writer flows.");
 
+  public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".functional.index.enable")

Review Comment:
   the equivalent record index config is formatted as `.record.index.enable`
   
   So that matches this config. However, for the functional index, the configs 
are 
   `.index.functional.file.group.count` `.index.functional.parallelism`
   
   and the equivalent partition stats config is `.index.partition.stats.enable`
   
   I think since the record index configs are 0.14 but the other index configs 
are 1.0, we should probably just change them all to mach the record index ones. 
So `.partition.stats.index.enable` `.functional.index.file.group.count` 
`.partition.index.stats.enable` maybe we can create some more static prefix 
strings? 
   
   
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1050,6 +1053,10 @@ public void update(HoodieCommitMetadata commitMetadata, 
HoodieData<HoodieRecord>
    * Update functional index from {@link HoodieCommitMetadata}.
    */
   private void updateFunctionalIndexIfPresent(HoodieCommitMetadata 
commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> 
partitionToRecordMap) {
+    // if functional index is disabled, then just return

Review Comment:
   I don't think this comment is useful. But pretty harmless. You can decide if 
you want it to stay



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+        // verify functional index records by querying metadata table
+        val result2DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result2DF.count() == 1)

Review Comment:
   Would it be difficult to also validate the data in addition to the count?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -527,6 +527,9 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFunctionalIndexPartiti
   }
 
   private Set<String> getFunctionalIndexPartitionsToInit() {
+    if (dataMetaClient.getIndexMetadata().isEmpty()) {

Review Comment:
   We are adding this now because before it was always assumed to exist because 
we couldn't disable the index?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+        // verify functional index records by querying metadata table
+        val result2DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result2DF.count() == 1)
+
+        // disable functional index
+        spark.sql("set hoodie.metadata.functional.index.enable=false")
+        // do another insert after initializing the index
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
+        // check query result
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '1970-04-26'")(
+          Seq(4, "a4")
+        )
+        // verify there are no new updates to functional index
+        val result3DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result3DF.count() == 1)
+
+        // enable functional index
+        spark.sql("set hoodie.metadata.functional.index.enable=true")
+        // do another insert after initializing the index
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 10000000)")

Review Comment:
   10000000 is hard to visually tell that they are the same unless they are 
right next to each other. Which they are not
   ```
   spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
   spark.sql(s"insert into $tableName values(5, 'a5', 10, 10000000)")
   ```



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)

Review Comment:
   ```
   assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
   ```
   This seems kinda reflexive?
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+        // verify functional index records by querying metadata table
+        val result2DF = spark.sql(

Review Comment:
   there's no result1DF? also usually see the f lowercase when we do Df (but I 
don't really care if you want to keep it uppercase)



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -316,6 +316,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Initializes the metadata table by reading from the 
file system when the table is first created. Enabled by default. "
           + "Warning: This should only be disabled when manually constructing 
the metadata table outside of typical Hudi writer flows.");
 
+  public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP = 
ConfigProperty

Review Comment:
   Mark advanced? I feel like every mdt config will be on the advanced side, so 
should we have a higher threshold (meaning that we only mark really advanced 
configs as advanced in HoodieMetadataConfig )? 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -316,6 +316,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .withDocumentation("Initializes the metadata table by reading from the 
file system when the table is first created. Enabled by default. "
           + "Warning: This should only be disabled when manually constructing 
the metadata table outside of typical Hudi writer flows.");
 
+  public static final ConfigProperty<Boolean> FUNCTIONAL_INDEX_ENABLE_PROP = 
ConfigProperty
+      .key(METADATA_PREFIX + ".functional.index.enable")

Review Comment:
   But I think there is an argument that the index part should go first. So all 
the mdt index configs have the same prefix. WDYT?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+        // verify functional index records by querying metadata table
+        val result2DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result2DF.count() == 1)
+
+        // disable functional index
+        spark.sql("set hoodie.metadata.functional.index.enable=false")
+        // do another insert after initializing the index
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
+        // check query result
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '1970-04-26'")(
+          Seq(4, "a4")
+        )
+        // verify there are no new updates to functional index
+        val result3DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result3DF.count() == 1)
+
+        // enable functional index
+        spark.sql("set hoodie.metadata.functional.index.enable=true")
+        // do another insert after initializing the index

Review Comment:
   `do another insert after initializing the index` 
   on line 469 and 483 confuse me a bit. Seems like maybe 1 of them should be 
different



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala:
##########
@@ -424,6 +424,78 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Functional Index") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        var functionalIndexMetadata = metaClient.getIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
+        // verify functional index records by querying metadata table
+        val result2DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result2DF.count() == 1)
+
+        // disable functional index
+        spark.sql("set hoodie.metadata.functional.index.enable=false")
+        // do another insert after initializing the index
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
+        // check query result
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '1970-04-26'")(
+          Seq(4, "a4")
+        )
+        // verify there are no new updates to functional index
+        val result3DF = spark.sql(
+          s"select * from hudi_metadata('$tableName') where type=3"
+        )
+        assert(result3DF.count() == 1)

Review Comment:
   > Would it be difficult to also validate the data in addition to the count?
   
   here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to