tandonraghav opened a new issue #2151:
URL: https://github.com/apache/hudi/issues/2151
I have a use case where Mongo Oplogs are ingested into Kafka Topic via
Debezium.
These oplogs are from N Collections, since Hudi does'nt support Multiple
table insertions in a DataSource.
I am taking these oplogs (Spark+Kafka Dstreams) and bucketing based on
collection names and then inserting it into Hudi Tables. (N Hudi Tables
corresponding to N collections)
- AFAIU Hudi doesnot support Async Compaction with Spark (Not using
Structured Streaming). so, is there any other way to run Async Comapction or
Periodic Compaction? Is there any tool which can be ran periodically to do
compactions?
- If I am using inline compaction, then how to do compaction every 15min or
periodically if there are no Upserts on the table?
- There can be around 100 tables for oplogs.
Code Snippet (Spark Streaming)
````
//Read From Kafka
List<Row> db_names = df.select("source.ns").distinct().collectAsList();
for(int i =0;i<db_names.size();i++){
Dataset<Row>
ds=kafkaDf.select("*").where(kafkaDf.col("source.ns").equalTo(dbName));
// Few other Transformations
persistDFInHudi(ds,sanitizedDBName,tablePath);
}
private void persistDFInHudi(Dataset<Row> ds, String dbName, String
tablePath) {
ds
.write().format("org.apache.hudi").
options(QuickstartUtils.getQuickstartWriteConfigs()).
option(DataSourceWriteOptions.OPERATION_OPT_KEY(),DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL()).
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(),DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()).
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),
"ts_ms").
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_id").
option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(),
MergeHudiPayload.class.getName()).
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"db_name").
option(HoodieWriteConfig.TABLE_NAME, dbName).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP,true).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(),"true").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName).
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(),"db_name").
option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(),"false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,1).
option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(),false).
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName()).
mode(SaveMode.Append).
save(tablePath);
}
````
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]