nsivabalan commented on a change in pull request #1004: [HUDI-328] Adding 
delete api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r348569104
 
 

 ##########
 File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 ##########
 @@ -72,131 +73,212 @@ private[hudi] object HoodieSparkSqlWriter {
         parameters(OPERATION_OPT_KEY)
       }
 
-    // register classes & schemas
-    val structName = s"${tblName.get}_record"
-    val nameSpace = s"hoodie.${tblName.get}"
-    sparkContext.getConf.registerKryoClasses(
-      Array(classOf[org.apache.avro.generic.GenericData],
-        classOf[org.apache.avro.Schema]))
-    val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, 
structName, nameSpace)
-    sparkContext.getConf.registerAvroSchemas(schema)
-    log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-    // Convert to RDD[HoodieRecord]
-    val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-    val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, 
structName, nameSpace)
-    val hoodieAllIncomingRecords = genericRecords.map(gr => {
-      val orderingVal = DataSourceUtils.getNestedFieldValAsString(
-        gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
-      DataSourceUtils.createHoodieRecord(gr,
-        orderingVal, keyGenerator.getKey(gr), 
parameters(PAYLOAD_CLASS_OPT_KEY))
-    }).toJavaRDD()
+    var writeSuccessful: Boolean = false
+    var commitTime: String = null
+    var writeStatuses: JavaRDD[WriteStatus] = null
 
     val jsc = new JavaSparkContext(sparkContext)
-
     val basePath = new Path(parameters("path"))
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
 
-    // Handle various save modes
-    if (mode == SaveMode.ErrorIfExists && exists) {
-      throw new HoodieException(s"hoodie dataset at $basePath already exists.")
-    }
-    if (mode == SaveMode.Ignore && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not 
performing actual writes.")
-      return (true, common.util.Option.empty())
-    }
-    if (mode == SaveMode.Overwrite && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Deleting existing 
data & overwriting with new data.")
-      fs.delete(basePath, true)
-      exists = false
-    }
+    if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+      // register classes & schemas
+      val structName = s"${tblName.get}_record"
+      val nameSpace = s"hoodie.${tblName.get}"
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+      val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+      sparkContext.getConf.registerAvroSchemas(schema)
+      log.info(s"Registered avro schema : ${schema.toString(true)}")
 
-    // Create the dataset if not present
-    if (!exists) {
-      HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, 
path.get, storageType,
-        tblName.get, "archived")
-    }
+      // Convert to RDD[HoodieRecord]
+      val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
+      val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
+      val hoodieAllIncomingRecords = genericRecords.map(gr => {
+        val orderingVal = DataSourceUtils.getNestedFieldValAsString(
+          gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
+        DataSourceUtils.createHoodieRecord(gr,
+          orderingVal, keyGenerator.getKey(gr), 
parameters(PAYLOAD_CLASS_OPT_KEY))
+      }).toJavaRDD()
 
-    // Create a HoodieWriteClient & issue the write.
-    val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, 
path.get, tblName.get,
-      mapAsJavaMap(parameters)
-    )
-
-    val hoodieRecords =
-      if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-        DataSourceUtils.dropDuplicates(
-          jsc,
-          hoodieAllIncomingRecords,
-          mapAsJavaMap(parameters), client.getTimelineServer)
-      } else {
-        hoodieAllIncomingRecords
+      // Handle various save modes
+      if (mode == SaveMode.ErrorIfExists && exists) {
 
 Review comment:
   you mean immediately after write handling begins? This was the code flow 
that existed. I didn't want to change any ordering w/o knowing the details. I 
just worked on delete path (i.e. else block lines 194 - 271) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to