Hello Spark Team

I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case 
of write- how do I get the user specified schema?

This is what I am trying to achieve-

val data = Seq(
   Row("one", 1, true, 12.34,6L, date, Decimal(999.00), timestamp, 2f, byteVal, 
shortVal),
   Row("two", 1, false, 13.34,7L, date, Decimal(3.3), timestamp, 3.59f, 
byteVal, shortVal)
)

val schema = new StructType()
   .add(StructField("name", StringType, true))
   .add(StructField("id", IntegerType, true))
   .add(StructField("flag", BooleanType, true))
   .add(StructField("salary", DoubleType, true))
   .add(StructField("phone", LongType, true))
   .add(StructField("dob", DateType, true))
   .add(StructField("weight",  DecimalType(Constants.DECIMAL_PRECISION,7), 
true))
   .add(StructField("time", TimestampType, true))
   .add(StructField("float", FloatType, true))
   .add(StructField("byte", ByteType, true))
   .add(StructField("short", ShortType, true))


val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
   schema)

//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
   .option("storage", storageAccountName)
   .option("container", outputContainer)
   .option("manifest", "/root/default.manifest.cdm.json")
   .option("entity", "TestEntity")
   .option("format", "parquet")
   .option("compression", "gzip")
   .option("appId", appid).option("appKey", appkey).option("tenantId", tenantid)
   .mode(SaveMode.Append)
   .save()

I have my custom DataSource Implementation as follows -


class DefaultSource extends DataSourceRegister with TableProvider  {

  override def shortName(): String = "spark-cdm"

  override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

  override def inferPartitioning(options: CaseInsensitiveStringMap): 
Array[Transform] = {
    getTable(null, null, options).partitioning
  }

  override def supportsExternalMetadata = true

  override def getTable(schema: StructType, partitioning: Array[Transform], 
properties: util.Map[String, String]): Table = {
    println(schema)
    new MysqlTable(schema, properties)
  }
}

I get null here. I am not sure how should I get the StructType I created on 
df.write.. Any help would be appreciated.

Thank and Regards,
Sricheta Ruj.


Thanks,
Sricheta Ruj

Reply via email to