Thank for the mail Bruno !! On Wed, Jan 18, 2017 at 1:10 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
> Sorry, something went wrong with the code for the Writer. Here it is again: > > import org.apache.avro.Schema > import org.apache.flink.streaming.connectors.fs.Writer > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.parquet.avro.AvroParquetWriter > import org.apache.parquet.hadoop.ParquetWriter > import org.apache.parquet.hadoop.metadata.CompressionCodecName > > @SerialVersionUID(1L) > class MyAvroParquetWriter[T](schema: String) extends Writer[T] { > > @transient private var writer: ParquetWriter[T] = _ > > override def open(fs: FileSystem, path: Path): Unit = { > writer = AvroParquetWriter.builder[T](path) > .withSchema(new Schema.Parser().parse(schema)) > .withCompressionCodec(CompressionCodecName.SNAPPY) > .build() > } > > override def write(element: T): Unit = writer.write(element) > > override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema) > > override def close(): Unit = writer.close() > > override def getPos: Long = writer.getDataSize > > override def flush(): Long = writer.getDataSize > > } > > Using this library as dependency: "org.apache.parquet" % "parquet-avro" % > "1.8.1". We use this writer in a rolling sink and seems fine so far. > > Cheers, > > Bruno > > On Wed, 18 Jan 2017 at 09:09 elmosca <brunoara...@gmail.com> wrote: > >> Hi Biswajit, >> >> We use the following Writer for Parquet using Avro conversion (using >> Scala): >> >> >> >> Using this library as dependency: "org.apache.parquet" % "parquet-avro" % >> "1.8.1". We use this writer in a rolling sink and seems fine so far. >> >> Cheers, >> >> Bruno >> >> >> >> -- >> View this message in context: http://apache-flink-user- >> mailing-list-archive.2336050.n4.nabble.com/Rolling-sink- >> parquet-Avro-output-tp11123p11127.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >