I implemented an Avro to Parquet writer which previously took an Avro schema in as a string to the constructor and passed it to the AvroParquetWriter. Now I'm wondering if there is a way to get the schema from the element and pass to the AvroParquetWriter. I tried grabbing the schema from the element in the write method but it is called later than open so that doesn't seem to work. I need to do this because I'm sinking several Kafka topics in one app to s3 so different messages need different schema passed to the writer.
class ParquetSinkWriter[T <: GenericRecord]() extends Writer[T] { @transient private var writer: ParquetWriter[T] = _ @transient private var schema: Schema = _ override def write(element: T): Unit = { schema = element.getSchema writer.write(element) } override def duplicate(): ParquetSinkWriter[T] = new ParquetSinkWriter[T]() override def close(): Unit = writer.close() override def getPos: Long = writer.getDataSize override def flush(): Long = writer.getDataSize override def open(fs: FileSystem, path: Path): Unit = { writer = AvroParquetWriter.builder[T](path) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .build() } }