And for a Java example which is actually similar to your pipeline, you can check the ParquetStreamingFileSinkITCase.
On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Avi, > > At a first glance I am not seeing anything wrong with your code. > Did you verify that there are elements flowing in your pipeline and that > checkpoints are actually completed? > And also can you check the logs at Job and Task Manager for anything > suspicious? > > Unfortunately, we do not allow specifying encoding and other parameters to > your writer, which is an omission > on our part and this should be fixed. Could you open a JIRA for that? > > If you want to know more about Flink's Parquet-Avro writer, feel free to > have a look at the ParquetAvroWriters > class. > > Cheers, > Kostas > > > On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Thanks a lot Kostas, but the file not created . what am I doing wrong? >> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer? >> >> object Tester extends App { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> def now = System.currentTimeMillis() >> val path = new Path(s"test-$now.parquet") >> val schema: Schema = new Schema.Parser().parse(schemaString) >> val streamingSink = StreamingFileSink.forBulkFormat( path, >> ParquetAvroWriters.forGenericRecord(schema)) >> .build() >> env.enableCheckpointing(100) >> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >> val genericReocrd: GenericRecord = new GenericData.Record(schema) >> genericReocrd.put("name", r.name) >> genericReocrd.put("code", r.code.asString) >> genericReocrd.put("ts", r.ts) >> genericReocrd >> } >> stream.addSink { r => >> println(s"In Sink $r") //getting this line >> streamingSink >> } >> env.execute() >> } >> >> Cheers >> Avi >> >> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> >>> Sorry, previously I got confused and I assumed you were using Flink's >>> StreamingFileSink. >>> >>> Could you try to use Flink's Avro - Parquet writer? >>> >>> StreamingFileSink.forBulkFormat( >>> Path...(MY_PATH), >>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>> .build() >>> >>> >>> Cheers, >>> Kostas >>> >>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>> wrote: >>> >>>> Thanks. >>>> yes, the *env.execute* is called and enabled checkpoints >>>> I think the problem is where to place the *writer.close *to flush the >>>> cache >>>> If I'll place on the sink after the write event e.g >>>> addSink{ >>>> writer.write >>>> writer.close >>>> } >>>> in this case only the first record will be included in the file but not >>>> the rest of the stream. >>>> >>>> >>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>> k.klou...@data-artisans.com> wrote: >>>> >>>>> Hi again Avi, >>>>> >>>>> In the first example that you posted (the one with the Kafka source), >>>>> do you call env.execute()? >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>>> Hi Avi, >>>>>> >>>>>> In the last snippet that you posted, you have not activated >>>>>> checkpoints. >>>>>> >>>>>> Checkpoints are needed for the StreamingFileSink to produce results, >>>>>> especially in the case of BulkWriters (like Parquet) where >>>>>> the part file is rolled upon reception of a checkpoint and the part >>>>>> is finalised (i.e. "committed") when the checkpoint gets completed >>>>>> successfully. >>>>>> >>>>>> Could you please enable checkpointing and make sure that the job runs >>>>>> long enough for at least some checkpoints to be completed? >>>>>> >>>>>> Thanks a lot, >>>>>> Kostas >>>>>> >>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>>>>> wrote: >>>>>> >>>>>>> Checkout this little App. you can see that the file is created but >>>>>>> no data is written. even for a single record >>>>>>> >>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>> import org.apache.avro.Schema >>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>> import org.apache.hadoop.fs.Path >>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>> import scala.io.Source >>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>> >>>>>>> object Tester extends App { >>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> def now = System.currentTimeMillis() >>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>> val schemaString = >>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>> val config = ParquetWriterConfig() >>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>> genericReocrd.put("name", "test_b") >>>>>>> genericReocrd.put("code", "NoError") >>>>>>> genericReocrd.put("ts", 100L) >>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>> .withSchema(schema) >>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>> .withPageSize(config.pageSize) >>>>>>> .withRowGroupSize(config.blockSize) >>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>> .withValidation(config.validating) >>>>>>> .build() >>>>>>> >>>>>>> writer.write(genericReocrd) >>>>>>> stream.addSink { r => >>>>>>> println(s"In Sink $r") >>>>>>> writer.write(r) >>>>>>> } >>>>>>> env.execute() >>>>>>> // writer.close() >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Can you try closing the writer? >>>>>>>> >>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>>>>>>> snapshot()( since you are checkpointing hence this method will be >>>>>>>> called) >>>>>>>> >>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Rafi, >>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will >>>>>>>>> try to add it as you suggested. however it seems that the messages I >>>>>>>>> repeating in the stream over and over even if I am pushing single >>>>>>>>> message >>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> Avi >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Avi, >>>>>>>>>> >>>>>>>>>> I can't see the part where you use >>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>> If this part in not set properly, it's possible that watermarks >>>>>>>>>> are not sent and nothing will be written to your Sink. >>>>>>>>>> >>>>>>>>>> See here for more details: >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>> >>>>>>>>>> Hope this helps, >>>>>>>>>> Rafi >>>>>>>>>> >>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>>>>> however it >>>>>>>>>>> seems like the stream is repeating itself like endless loop and the >>>>>>>>>>> parquet >>>>>>>>>>> file is not written . can someone please help me with this? >>>>>>>>>>> >>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>> private val schemaString = >>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>> .build() >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>>>>>>> SinkFunction[GenericRecord] { >>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>>>>>>> writer.write(value) //the output is not written to the file >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> //main app >>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>> >>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>> >>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a >>>>>>>>>>> loop* >>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>> * genericReocrd* >>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>> >>>>>>>>>>> Thanks for your help >>>>>>>>>>> Avi >>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Thanks, >>>>>>>> Vipul >>>>>>>> >>>>>>>