Hi Avi, For Bulk Formats like Parquet, unfortunately, we do not support setting the batch size. The part-files roll on every checkpoint. This is a known limitation and there are plans to alleviate it in the future.
Setting the batch size (among other things) is supported for RowWise formats. Cheers, Kostas On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Thanks Kostas. I will definitely look into that. but is the > StreamingFileSink also support setting the batch size by size and/or by > time interval like bucketing sink ? > > On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > >> Hi Avi, >> >> The ParquetAvroWriters cannot be used with the BucketingSink. >> >> In fact the StreamingFIleSink is the "evolution" of the BucketingSink and >> it supports >> all the functionality that the BucketingSink supports. >> >> Given this, why not using the StreamingFileSink? >> >> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Thanks looks good. >>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a >>> BucketingSink >>> file >>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink> >>> ? something like : >>> >>> val bucketingSink = new BucketingSink[String]("/base/path") >>> bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) >>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema)) >>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, >>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins >>> >>> >>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas < >>> k.klou...@data-artisans.com> wrote: >>> >>>> And for a Java example which is actually similar to your pipeline, >>>> you can check the ParquetStreamingFileSinkITCase. >>>> >>>> >>>> >>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas < >>>> k.klou...@data-artisans.com> wrote: >>>> >>>>> Hi Avi, >>>>> >>>>> At a first glance I am not seeing anything wrong with your code. >>>>> Did you verify that there are elements flowing in your pipeline and >>>>> that checkpoints are actually completed? >>>>> And also can you check the logs at Job and Task Manager for anything >>>>> suspicious? >>>>> >>>>> Unfortunately, we do not allow specifying encoding and other >>>>> parameters to your writer, which is an omission >>>>> on our part and this should be fixed. Could you open a JIRA for that? >>>>> >>>>> If you want to know more about Flink's Parquet-Avro writer, feel free >>>>> to have a look at the ParquetAvroWriters >>>>> class. >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> >>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> >>>>> wrote: >>>>> >>>>>> Thanks a lot Kostas, but the file not created . what am I doing >>>>>> wrong? >>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet >>>>>> writer? >>>>>> >>>>>> object Tester extends App { >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> def now = System.currentTimeMillis() >>>>>> val path = new Path(s"test-$now.parquet") >>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>> val streamingSink = StreamingFileSink.forBulkFormat( path, >>>>>> ParquetAvroWriters.forGenericRecord(schema)) >>>>>> .build() >>>>>> env.enableCheckpointing(100) >>>>>> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>> genericReocrd.put("name", r.name) >>>>>> genericReocrd.put("code", r.code.asString) >>>>>> genericReocrd.put("ts", r.ts) >>>>>> genericReocrd >>>>>> } >>>>>> stream.addSink { r => >>>>>> println(s"In Sink $r") //getting this line >>>>>> streamingSink >>>>>> } >>>>>> env.execute() >>>>>> } >>>>>> >>>>>> Cheers >>>>>> Avi >>>>>> >>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >>>>>> k.klou...@data-artisans.com> wrote: >>>>>> >>>>>>> >>>>>>> Sorry, previously I got confused and I assumed you were using >>>>>>> Flink's StreamingFileSink. >>>>>>> >>>>>>> Could you try to use Flink's Avro - Parquet writer? >>>>>>> >>>>>>> StreamingFileSink.forBulkFormat( >>>>>>> Path...(MY_PATH), >>>>>>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>>>>>> .build() >>>>>>> >>>>>>> >>>>>>> Cheers, >>>>>>> Kostas >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks. >>>>>>>> yes, the *env.execute* is called and enabled checkpoints >>>>>>>> I think the problem is where to place the *writer.close *to flush >>>>>>>> the cache >>>>>>>> If I'll place on the sink after the write event e.g >>>>>>>> addSink{ >>>>>>>> writer.write >>>>>>>> writer.close >>>>>>>> } >>>>>>>> in this case only the first record will be included in the file but >>>>>>>> not the rest of the stream. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi again Avi, >>>>>>>>> >>>>>>>>> In the first example that you posted (the one with the Kafka >>>>>>>>> source), do you call env.execute()? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Avi, >>>>>>>>>> >>>>>>>>>> In the last snippet that you posted, you have not activated >>>>>>>>>> checkpoints. >>>>>>>>>> >>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce >>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where >>>>>>>>>> the part file is rolled upon reception of a checkpoint and the >>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets >>>>>>>>>> completed >>>>>>>>>> successfully. >>>>>>>>>> >>>>>>>>>> Could you please enable checkpointing and make sure that the job >>>>>>>>>> runs long enough for at least some checkpoints to be completed? >>>>>>>>>> >>>>>>>>>> Thanks a lot, >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Checkout this little App. you can see that the file is created >>>>>>>>>>> but no data is written. even for a single record >>>>>>>>>>> >>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>>>>>> import org.apache.avro.Schema >>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>>>>>> import >>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>>>>> import org.apache.hadoop.fs.Path >>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter >>>>>>>>>>> } >>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>>>>>> import scala.io.Source >>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>> >>>>>>>>>>> object Tester extends App { >>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>> def now = System.currentTimeMillis() >>>>>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>>>>> val schemaString = >>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>>> val config = ParquetWriterConfig() >>>>>>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>>>>>> genericReocrd.put("name", "test_b") >>>>>>>>>>> genericReocrd.put("code", "NoError") >>>>>>>>>>> genericReocrd.put("ts", 100L) >>>>>>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>> .withSchema(schema) >>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>> .build() >>>>>>>>>>> >>>>>>>>>>> writer.write(genericReocrd) >>>>>>>>>>> stream.addSink { r => >>>>>>>>>>> println(s"In Sink $r") >>>>>>>>>>> writer.write(r) >>>>>>>>>>> } >>>>>>>>>>> env.execute() >>>>>>>>>>> // writer.close() >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Can you try closing the writer? >>>>>>>>>>>> >>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() >>>>>>>>>>>> in snapshot()( since you are checkpointing hence this method will >>>>>>>>>>>> be called) >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi < >>>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Rafi, >>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I >>>>>>>>>>>>> will try to add it as you suggested. however it seems that the >>>>>>>>>>>>> messages I >>>>>>>>>>>>> repeating in the stream over and over even if I am pushing single >>>>>>>>>>>>> message >>>>>>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers >>>>>>>>>>>>> Avi >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch < >>>>>>>>>>>>> rafi.ar...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Avi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I can't see the part where you use >>>>>>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>>>>>> If this part in not set properly, it's possible that >>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your Sink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> See here for more details: >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hope this helps, >>>>>>>>>>>>>> Rafi >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>>>>>>>>> however it >>>>>>>>>>>>>>> seems like the stream is repeating itself like endless loop and >>>>>>>>>>>>>>> the parquet >>>>>>>>>>>>>>> file is not written . can someone please help me with this? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>>>>>> private val schemaString = >>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>>>>>> private val compressionCodecName = >>>>>>>>>>>>>>> CompressionCodecName.SNAPPY >>>>>>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>>>>> .build() >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) >>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] { >>>>>>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>>>>>> println(s"ADDING TO File : $value") // getting this >>>>>>>>>>>>>>> output >>>>>>>>>>>>>>> writer.write(value) //the output is not written to the >>>>>>>>>>>>>>> file >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> //main app >>>>>>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in >>>>>>>>>>>>>>> a loop* >>>>>>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>>>>>> * genericReocrd* >>>>>>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your help >>>>>>>>>>>>>>> Avi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Vipul >>>>>>>>>>>> >>>>>>>>>>>