ehurheap commented on issue #9807:
URL: https://github.com/apache/hudi/issues/9807#issuecomment-1756516663
```
object UUIDRecordKeyDeleter {
private val log = loggerForClass(UUIDRecordKeyDeleter.getClass)
def query(tablePath: String, predicate: Column, queryType: String)(implicit
spark: SparkSession
): DataFrame = {
spark.read
.format("hudi")
.option(QUERY_TYPE.key(), queryType)
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.load(tablePath)
.where(predicate)
}
// Computes record keys of the records matching the predicate
def computeRecordKeys(
tablePath: String,
predicate: Column
)(implicit
spark: SparkSession
): JavaRDD[HoodieKey] = {
implicit val hoodieKeyEncoder: Encoder[HoodieKey] =
Encoders.bean(classOf[org.apache.hudi.common.model.HoodieKey])
query(tablePath, predicate, QUERY_TYPE_SNAPSHOT_OPT_VAL)
.select(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.map { row =>
{
val recordKey = row.getString(0)
val partitionPath = row.getString(1)
new org.apache.hudi.common.model.HoodieKey(recordKey,
partitionPath)
}
}
.toJavaRDD
}
// Deletes all records with the provided record keys without triggering
compaction.
def deleteRecords(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
recordKeys: JavaRDD[HoodieKey]
)(implicit
spark: SparkSession
): DeleteStats = {
var stats = DeleteStats()
val writeClient =
buildWriteClient(config, envConfig, dataLakeRecord, spark.sparkContext)
var deleteInstant: String = ""
try {
deleteInstant = writeClient.startCommit()
val statuses: mutable.Seq[WriteStatus] =
writeClient.delete(recordKeys, deleteInstant).collect().asScala
stats = stats.copy(
totalDeleted = getTotalDeletesFromWriteStatuses(statuses),
totalPartitionsDeleted =
getPartitionsFromWriteStatuses(statuses).size
)
} catch {
case t: Throwable =>
logErrorAndExit(s"Delete operation failed for instant $deleteInstant
due to ", t)
} finally {
log.info(s"Finished delete operation for instant $deleteInstant")
writeClient.close()
}
stats
}
def compactReadOptimizedRecordsMatchingPredicate(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
predicate: Column
)(implicit spark: SparkSession): DeleteStats = {
import spark.implicits._
var stats = DeleteStats()
if (config.scheduleCompaction) {
val partitions = query(config.tablePath, predicate,
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.select(col(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.distinct()
.map(r => r.getString(0))
.collect()
.toList
if (partitions.isEmpty) {
log.warn(
s"Skipping compaction on table ${dataLakeRecord.tableName} as no
records were found"
)
} else {
log.info(
s"Compacting ${partitions.size} partition paths in table
${dataLakeRecord.tableName}: ${partitions.take(10).mkString(",")}..."
)
val writeClient =
compactingWriteClient(config, dataLakeRecord.tableName, envConfig,
partitions)
// Retry any pending (requested/inflight) compactions targeting
overlapping partitions.
// This is essentially a recovery process - if a previous run failed
after scheduling a compaction,
// the file groups in that compaction plan will be excluded from the
new scheduled plan and their compaction won't be retried.
// The compaction execution uses locks therefore it is safe to
execute the same compaction plan concurrently.
// The lake API limits GDPR deletion jobs to one at a time per
env_id, therefore such conflict is unlikely (but still possible with mutations).
// todo: get stats from this compaction to aggregate into final stats
if (config.executeCompaction) {
CompactionHelper.executeCompactionsWithOverlappingPartitions(
config.tablePath,
partitions,
writeClient
)
}
try {
log.info(s"""Scheduling compaction on table
${dataLakeRecord.tableName}:
| hoodie.compaction.strategy:
${writeClient.getConfig.getCompactionStrategy}
| "hoodie.compaction.include.partitions":
${writeClient.getConfig
.getString(
"hoodie.compaction.include.partitions"
)
.take(100)}...
| hoodie.compact.schedule.inline:
${writeClient.getConfig
.scheduleInlineCompaction()}
| hoodie.compact.inline:
${writeClient.getConfig
.inlineCompactionEnabled()}
| hoodie.write.lock.provider:
${writeClient.getConfig.getLockProviderClass}
| hoodie.metadata.enable:
${writeClient.getConfig.getMetadataConfig
.enabled()}
|""".stripMargin)
val instant =
writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty())
if (instant.isPresent) {
if (config.executeCompaction) {
log.info(
s"Running compaction on table ${dataLakeRecord.tableName} at
instant: ${instant.get}"
)
val metadata =
writeClient.compact(instant.get).getCommitMetadata
stats = stats.copy(
compactionDeletes = metadata.get.getTotalRecordsDeleted,
compactionTotalFiles =
metadata.get.getTotalLogFilesCompacted,
compactionTotalPartitions =
metadata.get.fetchTotalPartitionsWritten
)
log.info(
s"Successfully completed compaction on table
${dataLakeRecord.tableName} at instant: ${instant.get}"
)
}
} else {
log.warn(
s"Unable to schedule compaction on table
${dataLakeRecord.tableName}, see Hudi logs for reason."
)
}
} catch {
case e: Throwable =>
logErrorAndExit(s"Compaction failed on table
${dataLakeRecord.tableName}!", e)
} finally {
writeClient.close()
}
}
}
stats
}
// Deletes all records matching the predicate and runs compaction
afterwards.
// Deletion and compaction are only executed if there are records matching
the predicate.
def deleteRecords(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
predicate: Column
)(implicit
spark: SparkSession
): DeleteStats = {
var stats = DeleteStats()
val recordCount = query(config.tablePath, predicate,
QUERY_TYPE_SNAPSHOT_OPT_VAL).count()
stats = stats.copy(
totalFoundBeforeDelete = recordCount
)
if (recordCount == 0) {
log.warn(
s"Skipping delete operation on table ${dataLakeRecord.tableName} as
no records were found"
)
DeleteStats()
} else {
log.warn(s"Deleting $recordCount records from table
${dataLakeRecord.tableName}")
val keysToDelete = computeRecordKeys(config.tablePath, predicate)
val deleteStats = deleteRecords(config, envConfig, dataLakeRecord,
keysToDelete)
stats = stats.copy(
totalDeleted = deleteStats.totalDeleted,
totalPartitionsDeleted = deleteStats.totalPartitionsDeleted
)
}
val compactionStats =
compactReadOptimizedRecordsMatchingPredicate(config, envConfig,
dataLakeRecord, predicate)
stats = stats.copy(
compactionDeletes = compactionStats.compactionDeletes,
compactionTotalFiles = compactionStats.compactionTotalFiles,
compactionTotalPartitions = compactionStats.compactionTotalPartitions
)
stats
}
def buildWriteClient(
config: WriteClientConfig,
envConfig: EnvConfig,
datalakeRecord: DataLakeRecord[_, _],
sparkContext: SparkContext
): SparkRDDWriteClient[_] = {
val lockProperties = new Properties()
val lockOptionsMap =
WriterOptions.lockOptions(datalakeRecord.tableName, config.region,
envConfig: EnvConfig)
lockProperties.putAll(lockOptionsMap.asJava)
val metricsProperties = new Properties()
val metricsOptionsMap = metricsOptions(config.statsDHost, envConfig,
datalakeRecord.tableName)
metricsProperties.putAll(metricsOptionsMap.asJava)
val writerConfig = HoodieWriteConfig
.newBuilder()
.withCompactionConfig(
HoodieCompactionConfig
.newBuilder()
.withInlineCompaction(false)
.withScheduleInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build()
)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProperties).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().fromProperties(metricsProperties).build())
.withDeleteParallelism(config.writeParallelism)
.withPath(config.tablePath)
.forTable(datalakeRecord.tableName)
.build()
val engineContext: HoodieEngineContext = new HoodieSparkEngineContext(
JavaSparkContext.fromSparkContext(sparkContext)
)
new SparkRDDWriteClient(engineContext, writerConfig)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]