tandonraghav opened a new issue #2151:
URL: https://github.com/apache/hudi/issues/2151


   I have a use case where Mongo Oplogs are ingested into Kafka Topic via 
Debezium.
   
   These oplogs are from N Collections, since Hudi does'nt support Multiple 
table insertions in a DataSource.
   
   I am taking these oplogs (Spark+Kafka Dstreams) and bucketing based on 
collection names and then inserting it into Hudi Tables. (N Hudi Tables 
corresponding to N collections)
   
   - AFAIU Hudi doesnot support Async Compaction with Spark (Not using 
Structured Streaming). so, is there any other way to run Async Comapction or 
Periodic Compaction? Is there any tool which can be ran periodically to do 
compactions?
   
   - If I am using inline compaction, then how to do compaction every 15min or 
periodically if there are no Upserts on the table?
   - There can be around 100 tables for oplogs. 
    
   Code Snippet (Spark Streaming)
   
   ````
   //Read From Kafka 
   List<Row> db_names = df.select("source.ns").distinct().collectAsList();
   
   for(int i =0;i<db_names.size();i++){
       Dataset<Row> 
ds=kafkaDf.select("*").where(kafkaDf.col("source.ns").equalTo(dbName)); 
       // Few other Transformations
       persistDFInHudi(ds,sanitizedDBName,tablePath);
   }
   
   private void persistDFInHudi(Dataset<Row> ds, String dbName, String 
tablePath) {
           ds
                   .write().format("org.apache.hudi").
                   options(QuickstartUtils.getQuickstartWriteConfigs()).
           
                   
option(DataSourceWriteOptions.OPERATION_OPT_KEY(),DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL()).
                   
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(),DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()).
                   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), 
"ts_ms").
                   
                   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_id").
                   option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(), 
MergeHudiPayload.class.getName()).
                   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"db_name").
                   option(HoodieWriteConfig.TABLE_NAME, dbName).
                   option(HoodieCompactionConfig.INLINE_COMPACT_PROP,true).
                   
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(),"true").
                   option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName).
                   
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(),"db_name").
                   
option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(),"false").
                   
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,1).
                   
option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(),false).
                   
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), 
MultiPartKeysValueExtractor.class.getName()).
                   mode(SaveMode.Append).
                   save(tablePath);
       }
   ````


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to