Checkout this little App. you can see that the file is created but no data is written. even for a single record
import io.eels.component.parquet.ParquetWriterConfig import org.apache.avro.Schema import org.apache.avro.generic.{ GenericData, GenericRecord } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } import org.apache.parquet.hadoop.metadata.CompressionCodecName import scala.io.Source import org.apache.flink.streaming.api.scala._ object Tester extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment def now = System.currentTimeMillis() val path = new Path(s"test-$now.parquet") val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString val schema: Schema = new Schema.Parser().parse(schemaString) val compressionCodecName = CompressionCodecName.SNAPPY val config = ParquetWriterConfig() val genericReocrd: GenericRecord = new GenericData.Record(schema) genericReocrd.put("name", "test_b") genericReocrd.put("code", "NoError") genericReocrd.put("ts", 100L) val stream = env.fromElements(genericReocrd) val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path) .withSchema(schema) .withCompressionCodec(compressionCodecName) .withPageSize(config.pageSize) .withRowGroupSize(config.blockSize) .withDictionaryEncoding(config.enableDictionary) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withValidation(config.validating) .build() writer.write(genericReocrd) stream.addSink { r => println(s"In Sink $r") writer.write(r) } env.execute() // writer.close() } On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote: > Can you try closing the writer? > > AvroParquetWriter has an internal buffer. Try doing a .close() in > snapshot()( since you are checkpointing hence this method will be called) > > On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Thanks Rafi, >> I am actually not using assignTimestampsAndWatermarks , I will try to >> add it as you suggested. however it seems that the messages I repeating in >> the stream over and over even if I am pushing single message manually to >> the queue, that message will repeat infinity >> >> Cheers >> Avi >> >> >> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >>> Hi Avi, >>> >>> I can't see the part where you use assignTimestampsAndWatermarks. >>> If this part in not set properly, it's possible that watermarks are not >>> sent and nothing will be written to your Sink. >>> >>> See here for more details: >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>> >>> Hope this helps, >>> Rafi >>> >>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: >>> >>>> Hi, >>>> >>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline >>>> consists of kafka as source and parquet file as a sink however it seems >>>> like the stream is repeating itself like endless loop and the parquet file >>>> is not written . can someone please help me with this? >>>> >>>> object ParquetSinkWriter{ >>>> private val path = new Path("tmp/pfile") >>>> private val schemaString = >>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>> private val avroSchema: Schema = new >>>> Schema.Parser().parse(schemaString) >>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>> private val config = ParquetWriterConfig() >>>> val writer: ParquetWriter[GenericRecord] = >>>> AvroParquetWriter.builder[GenericRecord](path) >>>> .withSchema(avroSchema) >>>> .withCompressionCodec(compressionCodecName) >>>> .withPageSize(config.pageSize) >>>> .withRowGroupSize(config.blockSize) >>>> .withDictionaryEncoding(config.enableDictionary) >>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>> .withValidation(config.validating) >>>> .build() >>>> } >>>> >>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>> SinkFunction[GenericRecord] { >>>> import ParquetSinkWriter._ >>>> override def invoke(value: GenericRecord): Unit = { >>>> println(s"ADDING TO File : $value") // getting this output >>>> writer.write(value) //the output is not written to the file >>>> } >>>> } >>>> >>>> //main app >>>> object StreamingJob extends App { >>>> implicit val env: StreamExecutionEnvironment = >>>> StreamExecutionEnvironment.getExecutionEnvironment >>>> env.enableCheckpointing(500) >>>> >>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>> >>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>> Time.seconds(3), Time.seconds(3))) >>>> val backend: StateBackend = new >>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>> env.setStateBackend(backend) >>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume from >>>> kafka)* >>>> *stream2.map { r =>* >>>> * println(s"MAPPING $r") //this output keeps repeating in a loop* >>>> * val genericReocrd: GenericRecord = new GenericData.Record(schema)* >>>> * genericReocrd.put("qname", r.qname)* >>>> * genericReocrd.put("rcode", r.rcode)* >>>> * genericReocrd.put("ts", r.ts)* >>>> * genericReocrd* >>>> * }.addSink(writer) * >>>> >>>> Thanks for your help >>>> Avi >>>> >>>> > > -- > Thanks, > Vipul >