Thanks Kostas, Ok got it, so bucketingSink might not be a good choice here. can you please advice what will be the best approach ? I have heavy load of data that I consume from kafka that I want to process and put them in a file (doesn't have to be parquet) . I thought that StreamingFileSink might be a good choice but I guess I am doing something wrong there . if there is a good example for that - it will be great .
BR Avi On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Avi, > > For Bulk Formats like Parquet, unfortunately, we do not support setting > the batch size. > The part-files roll on every checkpoint. This is a known limitation and > there are plans to > alleviate it in the future. > > Setting the batch size (among other things) is supported for RowWise > formats. > > Cheers, > Kostas > > On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Thanks Kostas. I will definitely look into that. but is the >> StreamingFileSink also support setting the batch size by size and/or by >> time interval like bucketing sink ? >> >> On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi Avi, >>> >>> The ParquetAvroWriters cannot be used with the BucketingSink. >>> >>> In fact the StreamingFIleSink is the "evolution" of the BucketingSink >>> and it supports >>> all the functionality that the BucketingSink supports. >>> >>> Given this, why not using the StreamingFileSink? >>> >>> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> wrote: >>> >>>> Thanks looks good. >>>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a >>>> BucketingSink >>>> file >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink> >>>> ? something like : >>>> >>>> val bucketingSink = new BucketingSink[String]("/base/path") >>>> bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) >>>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema)) >>>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, >>>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins >>>> >>>> >>>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas < >>>> k.klou...@data-artisans.com> wrote: >>>> >>>>> And for a Java example which is actually similar to your pipeline, >>>>> you can check the ParquetStreamingFileSinkITCase. >>>>> >>>>> >>>>> >>>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>>> Hi Avi, >>>>>> >>>>>> At a first glance I am not seeing anything wrong with your code. >>>>>> Did you verify that there are elements flowing in your pipeline and >>>>>> that checkpoints are actually completed? >>>>>> And also can you check the logs at Job and Task Manager for anything >>>>>> suspicious? >>>>>> >>>>>> Unfortunately, we do not allow specifying encoding and other >>>>>> parameters to your writer, which is an omission >>>>>> on our part and this should be fixed. Could you open a JIRA for that? >>>>>> >>>>>> If you want to know more about Flink's Parquet-Avro writer, feel free >>>>>> to have a look at the ParquetAvroWriters >>>>>> class. >>>>>> >>>>>> Cheers, >>>>>> Kostas >>>>>> >>>>>> >>>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks a lot Kostas, but the file not created . what am I doing >>>>>>> wrong? >>>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet >>>>>>> writer? >>>>>>> >>>>>>> object Tester extends App { >>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> def now = System.currentTimeMillis() >>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>> val streamingSink = StreamingFileSink.forBulkFormat( path, >>>>>>> ParquetAvroWriters.forGenericRecord(schema)) >>>>>>> .build() >>>>>>> env.enableCheckpointing(100) >>>>>>> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>> genericReocrd.put("name", r.name) >>>>>>> genericReocrd.put("code", r.code.asString) >>>>>>> genericReocrd.put("ts", r.ts) >>>>>>> genericReocrd >>>>>>> } >>>>>>> stream.addSink { r => >>>>>>> println(s"In Sink $r") //getting this line >>>>>>> streamingSink >>>>>>> } >>>>>>> env.execute() >>>>>>> } >>>>>>> >>>>>>> Cheers >>>>>>> Avi >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> Sorry, previously I got confused and I assumed you were using >>>>>>>> Flink's StreamingFileSink. >>>>>>>> >>>>>>>> Could you try to use Flink's Avro - Parquet writer? >>>>>>>> >>>>>>>> StreamingFileSink.forBulkFormat( >>>>>>>> Path...(MY_PATH), >>>>>>>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>>>>>>> .build() >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kostas >>>>>>>> >>>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> yes, the *env.execute* is called and enabled checkpoints >>>>>>>>> I think the problem is where to place the *writer.close *to flush >>>>>>>>> the cache >>>>>>>>> If I'll place on the sink after the write event e.g >>>>>>>>> addSink{ >>>>>>>>> writer.write >>>>>>>>> writer.close >>>>>>>>> } >>>>>>>>> in this case only the first record will be included in the file >>>>>>>>> but not the rest of the stream. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi again Avi, >>>>>>>>>> >>>>>>>>>> In the first example that you posted (the one with the Kafka >>>>>>>>>> source), do you call env.execute()? >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Avi, >>>>>>>>>>> >>>>>>>>>>> In the last snippet that you posted, you have not activated >>>>>>>>>>> checkpoints. >>>>>>>>>>> >>>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce >>>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where >>>>>>>>>>> the part file is rolled upon reception of a checkpoint and the >>>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets >>>>>>>>>>> completed >>>>>>>>>>> successfully. >>>>>>>>>>> >>>>>>>>>>> Could you please enable checkpointing and make sure that the job >>>>>>>>>>> runs long enough for at least some checkpoints to be completed? >>>>>>>>>>> >>>>>>>>>>> Thanks a lot, >>>>>>>>>>> Kostas >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi < >>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Checkout this little App. you can see that the file is created >>>>>>>>>>>> but no data is written. even for a single record >>>>>>>>>>>> >>>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>>>>>>> import org.apache.avro.Schema >>>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>>>>>>> import >>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>>>>>> import org.apache.hadoop.fs.Path >>>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, >>>>>>>>>>>> ParquetWriter } >>>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>>>>>>> import scala.io.Source >>>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>>> >>>>>>>>>>>> object Tester extends App { >>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>> def now = System.currentTimeMillis() >>>>>>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>>>>>> val schemaString = >>>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>>>> val config = ParquetWriterConfig() >>>>>>>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>>>>>>> genericReocrd.put("name", "test_b") >>>>>>>>>>>> genericReocrd.put("code", "NoError") >>>>>>>>>>>> genericReocrd.put("ts", 100L) >>>>>>>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>> .withSchema(schema) >>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>> .build() >>>>>>>>>>>> >>>>>>>>>>>> writer.write(genericReocrd) >>>>>>>>>>>> stream.addSink { r => >>>>>>>>>>>> println(s"In Sink $r") >>>>>>>>>>>> writer.write(r) >>>>>>>>>>>> } >>>>>>>>>>>> env.execute() >>>>>>>>>>>> // writer.close() >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you try closing the writer? >>>>>>>>>>>>> >>>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() >>>>>>>>>>>>> in snapshot()( since you are checkpointing hence this method will >>>>>>>>>>>>> be called) >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi < >>>>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Rafi, >>>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I >>>>>>>>>>>>>> will try to add it as you suggested. however it seems that the >>>>>>>>>>>>>> messages I >>>>>>>>>>>>>> repeating in the stream over and over even if I am pushing >>>>>>>>>>>>>> single message >>>>>>>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>> Avi >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch < >>>>>>>>>>>>>> rafi.ar...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Avi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I can't see the part where you use >>>>>>>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>>>>>>> If this part in not set properly, it's possible that >>>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your >>>>>>>>>>>>>>> Sink. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> See here for more details: >>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hope this helps, >>>>>>>>>>>>>>> Rafi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi < >>>>>>>>>>>>>>> avi.l...@bluevoyant.com wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. >>>>>>>>>>>>>>>> The pipeline consists of kafka as source and parquet file as a >>>>>>>>>>>>>>>> sink however >>>>>>>>>>>>>>>> it seems like the stream is repeating itself like endless loop >>>>>>>>>>>>>>>> and the >>>>>>>>>>>>>>>> parquet file is not written . can someone please help me with >>>>>>>>>>>>>>>> this? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>>>>>>> private val schemaString = >>>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>>>>>>> private val compressionCodecName = >>>>>>>>>>>>>>>> CompressionCodecName.SNAPPY >>>>>>>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>>>>>> .build() >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) >>>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] { >>>>>>>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>>>>>>> println(s"ADDING TO File : $value") // getting this >>>>>>>>>>>>>>>> output >>>>>>>>>>>>>>>> writer.write(value) //the output is not written to the >>>>>>>>>>>>>>>> file >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> //main app >>>>>>>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating >>>>>>>>>>>>>>>> in a loop* >>>>>>>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>>>>>>> * genericReocrd* >>>>>>>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your help >>>>>>>>>>>>>>>> Avi >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Vipul >>>>>>>>>>>>> >>>>>>>>>>>>