I implemented an Avro to Parquet writer which previously took an Avro
schema in as a string to the constructor and passed it to the
AvroParquetWriter. Now I'm wondering if there is a way to get the schema
from the element and pass to the AvroParquetWriter. I tried grabbing the
schema from the element in the write method but it is called later than
open so that doesn't seem to work. I need to do this because I'm sinking
several Kafka topics in one app to s3 so different messages need different
schema passed to the writer.

class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] {

@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _

override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}

override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]()

override def close(): Unit = writer.close()

override def getPos: Long = writer.getDataSize

override def flush(): Long = writer.getDataSize

override def open(fs: FileSystem, path: Path): Unit = {

writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}

}

Reply via email to