nsivabalan commented on a change in pull request #1004: [HUDI-328] Adding
delete api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r348211945
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -72,131 +73,200 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}
- // register classes & schemas
- val structName = s"${tblName.get}_record"
- val nameSpace = s"hoodie.${tblName.get}"
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema,
structName, nameSpace)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
-
- // Convert to RDD[HoodieRecord]
- val keyGenerator =
DataSourceUtils.createKeyGenerator(toProperties(parameters))
- val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df,
structName, nameSpace)
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val orderingVal = DataSourceUtils.getNestedFieldValAsString(
- gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(gr,
- orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
- }).toJavaRDD()
+ var writeSuccessful: Boolean = false
+ var commitTime: String = null
+ var writeStatuses: JavaRDD[WriteStatus] = null
val jsc = new JavaSparkContext(sparkContext)
-
val basePath = new Path(parameters("path"))
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie dataset at $basePath already exists.")
- }
- if (mode == SaveMode.Ignore && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not
performing actual writes.")
- return (true, common.util.Option.empty())
- }
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Deleting existing
data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
Review comment:
I couldn't get this working. Checked with a colleague of mine too. Not easy
to solve this. Will leave it as is for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services