yihua commented on code in PR #18265:
URL: https://github.com/apache/hudi/pull/18265#discussion_r3036406437
##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -64,13 +65,17 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
private static final int DEFAULT_LISTING_PARALLELISM = 1500;
+ @Getter
Review Comment:
🤖 Is `getDatabaseName()` guaranteed to be non-null here? For tables created
without a database name in the table config, this would store null. If the
catalog-backed path later uses this database name to look up partitions, that
could fail. Have you verified the behavior for tables without an explicit
database name?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -134,6 +134,13 @@ List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> re
Types.RecordType partitionFields,
Expression
expression) throws IOException;
+ default List<String>
getPartitionPathWithPathPrefixUsingFilterExpression(List<String>
relativePathPrefixes,
Review Comment:
🤖 Using `List<Object>` for `partitionPredicateExpressions` loses all type
safety. Callers will need to cast blindly, and any mismatch will only surface
as a runtime ClassCastException. Could this be a generic type parameter on the
interface, or would a more specific type (even a simple wrapper) work here to
avoid the raw Object list?
##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -609,16 +608,9 @@ private void resetTableMetadata(HoodieTableMetadata
newTableMetadata) {
tableMetadata = newTableMetadata;
}
- private static HoodieTableMetadata createMetadataTable(
- HoodieEngineContext engineContext,
- HoodieStorage storage,
- TableMetadataFactory metadataFactory,
- HoodieMetadataConfig metadataConfig,
- StoragePath basePath
- ) {
- HoodieTableMetadata newTableMetadata = metadataFactory.create(
- engineContext, storage, metadataConfig, basePath.toString(), true);
- return newTableMetadata;
+ protected HoodieTableMetadata createMetadataTable(HoodieEngineContext
engineContext) {
+ return metaClient.getTableFormat().getMetadataFactory()
+ .create(engineContext, metaClient.getStorage(), metadataConfig,
basePath.toString(), true);
}
Review Comment:
🤖 Making `createMetadataTable` protected (overridable) while it's called
from `doRefresh()` at line 201 during the base class constructor is risky. If a
subclass overrides this method and accesses subclass-specific fields (e.g.,
catalog reference, spark session), those fields won't be initialized yet when
the base constructor runs. Could you consider using lazy initialization or a
post-construction `init()` method instead?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+ tableConfig: HoodieTableConfig,
+ storage: HoodieStorage,
+ datasetBasePath: String) extends
+ FileSystemBackedTableMetadata(engineContext, tableConfig, storage,
datasetBasePath) with Logging {
+
+ private val sparkSession =
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+ private val catalogTableName = tableConfig.getTableName
+ private lazy val catalogDatabaseName =
+ if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
+ sparkSession.sessionState.catalog.getCurrentDatabase
+ } else {
+ tableConfig.getDatabaseName
+ }
+ private lazy val tableIdentifier = TableIdentifier(catalogTableName,
Some(catalogDatabaseName))
+ private lazy val catalogTable =
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+
Review Comment:
🤖 If the table isn't registered in the catalog (e.g., created via DataSource
API with just a path), `getTableMetadata` will throw `NoSuchTableException`.
Since `catalogTable` is accessed by `isPartitionedTable` and
`shouldUseCatalogPartitions` in every partition listing method, the exception
propagates up without any fallback to `super` (filesystem listing). Could you
wrap the catalog access in a `Try` and fall back to `super` when the table
isn't found?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+ tableConfig: HoodieTableConfig,
+ storage: HoodieStorage,
+ datasetBasePath: String) extends
+ FileSystemBackedTableMetadata(engineContext, tableConfig, storage,
datasetBasePath) with Logging {
+
+ private val sparkSession =
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+ private val catalogTableName = tableConfig.getTableName
+ private lazy val catalogDatabaseName =
+ if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
Review Comment:
🤖 When `getDatabaseName` returns null/empty, this falls back to
`getCurrentDatabase`, which reflects the session's current database context —
not necessarily where the table is registered. If a user runs `USE
some_other_db` before querying, this would look up the wrong database and
either throw `NoSuchTableException` or find a different table with the same
name. Would it be safer to resolve the database from the catalog table's
metadata (e.g., via the table's location path) or at least document this
assumption?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -122,6 +123,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
protected lazy val shouldFastBootstrap =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
+ lazy val isPartitionListingViaCatalogEnabled: Boolean = {
+ configProperties.getBoolean(FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.key,
+ FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.defaultValue()) &&
+ !metaClient.getTableConfig.isMetadataTableAvailable
Review Comment:
🤖 I share this concern. The current condition requires the user to
explicitly disable MDT *and* enable the catalog config. But the motivation is
handling corrupted MDT — in that scenario, MDT is still technically enabled in
the table config, it's just broken. So the user would need to first disable
MDT, then enable catalog listing, which is two steps. It might be worth
checking whether MDT is *functional* (e.g., metadata table exists and is
accessible) rather than just whether the config is enabled.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+ tableConfig: HoodieTableConfig,
+ storage: HoodieStorage,
+ datasetBasePath: String) extends
+ FileSystemBackedTableMetadata(engineContext, tableConfig, storage,
datasetBasePath) with Logging {
+
+ private val sparkSession =
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+ private val catalogTableName = tableConfig.getTableName
+ private lazy val catalogDatabaseName =
+ if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
+ sparkSession.sessionState.catalog.getCurrentDatabase
+ } else {
+ tableConfig.getDatabaseName
+ }
+ private lazy val tableIdentifier = TableIdentifier(catalogTableName,
Some(catalogDatabaseName))
+ private lazy val catalogTable =
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+
+ private def isPartitionedTable: Boolean = {
+ catalogTable.partitionColumnNames.nonEmpty
+ }
+
+ private def shouldUseCatalogPartitions: Boolean = {
+ isPartitionedTable && catalogTable.tracksPartitionsInCatalog
+ }
+
+ override def getAllPartitionPaths():
+ util.List[String] =
+ if (!isPartitionedTable) {
+ util.Collections.emptyList()
+ } else if (shouldUseCatalogPartitions) {
+ sparkSession.sessionState.catalog.externalCatalog
+ .listPartitions(catalogDatabaseName, catalogTableName)
+ .map(catalogTablePartition => {
+ val partitionPathURI = new
StoragePath(catalogTablePartition.location)
+ FSUtils.getRelativePartitionPath(dataBasePath, partitionPathURI)
+ }).asJava
+ } else {
+ super.getAllPartitionPaths()
+ }
+
+ override def getPartitionPathWithPathPrefixes(relativePathPrefixes:
util.List[String]):
+ util.List[String] =
+ if (!isPartitionedTable) {
+ util.Collections.emptyList()
+ } else if (shouldUseCatalogPartitions) {
+ filterPartitionsBasedOnRelativePathPrefixes(relativePathPrefixes,
+ sparkSession.sessionState.catalog.externalCatalog
+ .listPartitions(catalogDatabaseName, catalogTableName))
+ } else {
+ super.getPartitionPathWithPathPrefixes(relativePathPrefixes)
+ }
+
+ override def
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix:
util.List[String],
+
partitionFields: Types.RecordType,
+ pushedExpr:
org.apache.hudi.expression.Expression,
+
partitionPredicateExpressions: util.List[Object]):
+ util.List[String] = {
+ if (!isPartitionedTable) {
+ util.Collections.emptyList()
+ } else if (shouldUseCatalogPartitions) {
+ val partitionPredicateExpressionSeq =
partitionPredicateExpressions.asScala.map(_.asInstanceOf[Expression]).toSeq
Review Comment:
🤖 The `else` branch falls back to the 3-param `super` call (without
`partitionPredicateExpressions`), which means the Spark partition predicate
expressions are silently dropped when the catalog isn't available. Is this
intentional? In that case the filesystem listing only uses the Hudi
`pushedExpr` for filtering, which should still be correct, but I wanted to
confirm no filter information is lost here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+ tableConfig: HoodieTableConfig,
+ storage: HoodieStorage,
+ datasetBasePath: String) extends
+ FileSystemBackedTableMetadata(engineContext, tableConfig, storage,
datasetBasePath) with Logging {
+
+ lazy val sparkSession =
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+
+ override def getAllPartitionPaths():
+ util.List[String] = {
+ val catalogTablePartitionSeq =
+ sparkSession.sessionState.catalog.externalCatalog
+ .listPartitions(getDatabaseName, getTableName)
+ catalogTablePartitionSeq
+ .map(catalogTablePartition => {
+ val partitionPathURI = new StoragePath(catalogTablePartition.location)
+ FSUtils.getRelativePartitionPath(dataBasePath, partitionPathURI)
+ }).asJava
+ }
+
+ override def getPartitionPathWithPathPrefixes(relativePathPrefixes:
util.List[String]):
+ util.List[String] = {
+ val catalogTablePartitionSeq =
+ sparkSession.sessionState.catalog.externalCatalog
+ .listPartitions(getDatabaseName, getTableName)
+ filterPartitionsBasedOnRelativePathPrefixs(relativePathPrefixes,
catalogTablePartitionSeq)
+ }
+
+ override def
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix:
util.List[String],
+
partitionFields: Types.RecordType,
+ pushedExpr:
org.apache.hudi.expression.Expression,
Review Comment:
🤖 The author's point about it being part of the parent class signature is
fair, but I think the concern still stands. In `CatalogBackedTableMetadata`,
`partitionPredicateExpressions` *is* used (line 95:
`partitionPredicateExpressions.asScala.map(_.asInstanceOf[Expression])`). The
issue is that `pushedExpr` (the Hudi internal `Expression`) is ignored while
the Spark `Expression` list is used instead. If `partitionPredicateExpressions`
is null (which is possible since the interface's default method delegates to
the 3-arg version without it), this would NPE.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]