Hi Avi, If Parquet is not a requirement then you can use the StreamingFileSink and write as plain text, if this is ok for you. In this case, you can set the batch size and specify a custom RollingPolicy in general.
For example I would recommend to check [1] where you have, of course, to adjust the Encoder and the RollingPolicy. https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java Cheers, Kostas On Mon, Dec 3, 2018 at 3:50 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Thanks Kostas, > Ok got it, so bucketingSink might not be a good choice here. can you > please advice what will be the best approach ? I have heavy load of data > that I consume from kafka that I want to process and put them in a file > (doesn't have to be parquet) . I thought that StreamingFileSink might be a > good choice but I guess I am doing something wrong there . if there is a > good example for that - it will be great . > > BR > Avi > > On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > >> Hi Avi, >> >> For Bulk Formats like Parquet, unfortunately, we do not support setting >> the batch size. >> The part-files roll on every checkpoint. This is a known limitation and >> there are plans to >> alleviate it in the future. >> >> Setting the batch size (among other things) is supported for RowWise >> formats. >> >> Cheers, >> Kostas >> >> On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Thanks Kostas. I will definitely look into that. but is the >>> StreamingFileSink also support setting the batch size by size and/or by >>> time interval like bucketing sink ? >>> >>> On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas < >>> k.klou...@data-artisans.com> wrote: >>> >>>> Hi Avi, >>>> >>>> The ParquetAvroWriters cannot be used with the BucketingSink. >>>> >>>> In fact the StreamingFIleSink is the "evolution" of the BucketingSink >>>> and it supports >>>> all the functionality that the BucketingSink supports. >>>> >>>> Given this, why not using the StreamingFileSink? >>>> >>>> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Thanks looks good. >>>>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a >>>>> BucketingSink >>>>> file >>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink> >>>>> ? something like : >>>>> >>>>> val bucketingSink = new BucketingSink[String]("/base/path") >>>>> bucketingSink.setBucketer(new >>>>> DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) >>>>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema)) >>>>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, >>>>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins >>>>> >>>>> >>>>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>>> And for a Java example which is actually similar to your pipeline, >>>>>> you can check the ParquetStreamingFileSinkITCase. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas < >>>>>> k.klou...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi Avi, >>>>>>> >>>>>>> At a first glance I am not seeing anything wrong with your code. >>>>>>> Did you verify that there are elements flowing in your pipeline and >>>>>>> that checkpoints are actually completed? >>>>>>> And also can you check the logs at Job and Task Manager for anything >>>>>>> suspicious? >>>>>>> >>>>>>> Unfortunately, we do not allow specifying encoding and other >>>>>>> parameters to your writer, which is an omission >>>>>>> on our part and this should be fixed. Could you open a JIRA for that? >>>>>>> >>>>>>> If you want to know more about Flink's Parquet-Avro writer, feel >>>>>>> free to have a look at the ParquetAvroWriters >>>>>>> class. >>>>>>> >>>>>>> Cheers, >>>>>>> Kostas >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks a lot Kostas, but the file not created . what am I doing >>>>>>>> wrong? >>>>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet >>>>>>>> writer? >>>>>>>> >>>>>>>> object Tester extends App { >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> def now = System.currentTimeMillis() >>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>> val streamingSink = StreamingFileSink.forBulkFormat( path, >>>>>>>> ParquetAvroWriters.forGenericRecord(schema)) >>>>>>>> .build() >>>>>>>> env.enableCheckpointing(100) >>>>>>>> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >>>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>>> genericReocrd.put("name", r.name) >>>>>>>> genericReocrd.put("code", r.code.asString) >>>>>>>> genericReocrd.put("ts", r.ts) >>>>>>>> genericReocrd >>>>>>>> } >>>>>>>> stream.addSink { r => >>>>>>>> println(s"In Sink $r") //getting this line >>>>>>>> streamingSink >>>>>>>> } >>>>>>>> env.execute() >>>>>>>> } >>>>>>>> >>>>>>>> Cheers >>>>>>>> Avi >>>>>>>> >>>>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Sorry, previously I got confused and I assumed you were using >>>>>>>>> Flink's StreamingFileSink. >>>>>>>>> >>>>>>>>> Could you try to use Flink's Avro - Parquet writer? >>>>>>>>> >>>>>>>>> StreamingFileSink.forBulkFormat( >>>>>>>>> Path...(MY_PATH), >>>>>>>>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>>>>>>>> .build() >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> yes, the *env.execute* is called and enabled checkpoints >>>>>>>>>> I think the problem is where to place the *writer.close *to >>>>>>>>>> flush the cache >>>>>>>>>> If I'll place on the sink after the write event e.g >>>>>>>>>> addSink{ >>>>>>>>>> writer.write >>>>>>>>>> writer.close >>>>>>>>>> } >>>>>>>>>> in this case only the first record will be included in the file >>>>>>>>>> but not the rest of the stream. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi again Avi, >>>>>>>>>>> >>>>>>>>>>> In the first example that you posted (the one with the Kafka >>>>>>>>>>> source), do you call env.execute()? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Kostas >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Avi, >>>>>>>>>>>> >>>>>>>>>>>> In the last snippet that you posted, you have not activated >>>>>>>>>>>> checkpoints. >>>>>>>>>>>> >>>>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce >>>>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where >>>>>>>>>>>> the part file is rolled upon reception of a checkpoint and the >>>>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets >>>>>>>>>>>> completed >>>>>>>>>>>> successfully. >>>>>>>>>>>> >>>>>>>>>>>> Could you please enable checkpointing and make sure that the >>>>>>>>>>>> job runs long enough for at least some checkpoints to be completed? >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot, >>>>>>>>>>>> Kostas >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi < >>>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Checkout this little App. you can see that the file is created >>>>>>>>>>>>> but no data is written. even for a single record >>>>>>>>>>>>> >>>>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>>>>>>>> import org.apache.avro.Schema >>>>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>>>>>>>> import >>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>>>>>>> import org.apache.hadoop.fs.Path >>>>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, >>>>>>>>>>>>> ParquetWriter } >>>>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>>>>>>>> import scala.io.Source >>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>>>> >>>>>>>>>>>>> object Tester extends App { >>>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>> def now = System.currentTimeMillis() >>>>>>>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>>>>>>> val schemaString = >>>>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>>>>> val config = ParquetWriterConfig() >>>>>>>>>>>>> val genericReocrd: GenericRecord = new >>>>>>>>>>>>> GenericData.Record(schema) >>>>>>>>>>>>> genericReocrd.put("name", "test_b") >>>>>>>>>>>>> genericReocrd.put("code", "NoError") >>>>>>>>>>>>> genericReocrd.put("ts", 100L) >>>>>>>>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>>> .withSchema(schema) >>>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>>> .build() >>>>>>>>>>>>> >>>>>>>>>>>>> writer.write(genericReocrd) >>>>>>>>>>>>> stream.addSink { r => >>>>>>>>>>>>> println(s"In Sink $r") >>>>>>>>>>>>> writer.write(r) >>>>>>>>>>>>> } >>>>>>>>>>>>> env.execute() >>>>>>>>>>>>> // writer.close() >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh < >>>>>>>>>>>>> neoea...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Can you try closing the writer? >>>>>>>>>>>>>> >>>>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a >>>>>>>>>>>>>> .close() in snapshot()( since you are checkpointing hence this >>>>>>>>>>>>>> method will >>>>>>>>>>>>>> be called) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi < >>>>>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Rafi, >>>>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I >>>>>>>>>>>>>>> will try to add it as you suggested. however it seems that the >>>>>>>>>>>>>>> messages I >>>>>>>>>>>>>>> repeating in the stream over and over even if I am pushing >>>>>>>>>>>>>>> single message >>>>>>>>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>> Avi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch < >>>>>>>>>>>>>>> rafi.ar...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Avi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I can't see the part where you use >>>>>>>>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>>>>>>>> If this part in not set properly, it's possible that >>>>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your >>>>>>>>>>>>>>>> Sink. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> See here for more details: >>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hope this helps, >>>>>>>>>>>>>>>> Rafi >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi < >>>>>>>>>>>>>>>> avi.l...@bluevoyant.com wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. >>>>>>>>>>>>>>>>> The pipeline consists of kafka as source and parquet file as >>>>>>>>>>>>>>>>> a sink however >>>>>>>>>>>>>>>>> it seems like the stream is repeating itself like endless >>>>>>>>>>>>>>>>> loop and the >>>>>>>>>>>>>>>>> parquet file is not written . can someone please help me with >>>>>>>>>>>>>>>>> this? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>>>>>>>> private val schemaString = >>>>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>>>>>>>> private val compressionCodecName = >>>>>>>>>>>>>>>>> CompressionCodecName.SNAPPY >>>>>>>>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>>>>>>> .build() >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) >>>>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] { >>>>>>>>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>>>>>>>> println(s"ADDING TO File : $value") // getting this >>>>>>>>>>>>>>>>> output >>>>>>>>>>>>>>>>> writer.write(value) //the output is not written to >>>>>>>>>>>>>>>>> the file >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> //main app >>>>>>>>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating >>>>>>>>>>>>>>>>> in a loop* >>>>>>>>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>>>>>>>> * genericReocrd* >>>>>>>>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your help >>>>>>>>>>>>>>>>> Avi >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Vipul >>>>>>>>>>>>>> >>>>>>>>>>>>>