bithw1 opened a new issue #2298:
URL: https://github.com/apache/hudi/issues/2298
Hi,
I have following code snippet and create 101 commits during writes, then I
use hudi-cli command `cleans run` to do clean with all the default
configurations,such as `hoodie.cleaner.commits.retained`.
After the clean is done, it looks nothing has been changed.
1. 101 commits are there,
2. the data are there
3. the parquet files on hdfs are unchanged.
I would ask how to see the effects of clean, thanks.
```
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
case class MyOrder(
name: String,
price: String,
creation_date: String,
dt: String)
object COWCleanerTest {
val spark = SparkSession.builder.appName("COWCleanerTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_cow_cleaner_1"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(df: DataFrame, round: Int) = {
val saveMode = round match {
case 0 => SaveMode.Overwrite
case _ => SaveMode.Append
}
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
import spark.implicits._
val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00",
"2020-11-19")
//Create table and insert 1 row
run(spark.createDataset(Seq(order)).toDF(), 1)
//Run 100 times and insert 100 rows ,one row per commit
(1 to 100).foreach {
i =>
val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" +
i, "2020-11-19")
val insertData = spark.createDataset(Seq(order)).toDF()
run(insertData, i)
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]