Thanks Kostas. I will definitely look into that. but is the StreamingFileSink also support setting the batch size by size and/or by time interval like bucketing sink ?
On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Avi, > > The ParquetAvroWriters cannot be used with the BucketingSink. > > In fact the StreamingFIleSink is the "evolution" of the BucketingSink and > it supports > all the functionality that the BucketingSink supports. > > Given this, why not using the StreamingFileSink? > > On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Thanks looks good. >> Do you know a way to use PaquetWriter or ParquetAvroWriters with a >> BucketingSink >> file >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink> >> ? something like : >> >> val bucketingSink = new BucketingSink[String]("/base/path") >> bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) >> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema)) >> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, >> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins >> >> >> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> And for a Java example which is actually similar to your pipeline, >>> you can check the ParquetStreamingFileSinkITCase. >>> >>> >>> >>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas < >>> k.klou...@data-artisans.com> wrote: >>> >>>> Hi Avi, >>>> >>>> At a first glance I am not seeing anything wrong with your code. >>>> Did you verify that there are elements flowing in your pipeline and >>>> that checkpoints are actually completed? >>>> And also can you check the logs at Job and Task Manager for anything >>>> suspicious? >>>> >>>> Unfortunately, we do not allow specifying encoding and other parameters >>>> to your writer, which is an omission >>>> on our part and this should be fixed. Could you open a JIRA for that? >>>> >>>> If you want to know more about Flink's Parquet-Avro writer, feel free >>>> to have a look at the ParquetAvroWriters >>>> class. >>>> >>>> Cheers, >>>> Kostas >>>> >>>> >>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Thanks a lot Kostas, but the file not created . what am I doing wrong? >>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer? >>>>> >>>>> object Tester extends App { >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> def now = System.currentTimeMillis() >>>>> val path = new Path(s"test-$now.parquet") >>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>> val streamingSink = StreamingFileSink.forBulkFormat( path, >>>>> ParquetAvroWriters.forGenericRecord(schema)) >>>>> .build() >>>>> env.enableCheckpointing(100) >>>>> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>> genericReocrd.put("name", r.name) >>>>> genericReocrd.put("code", r.code.asString) >>>>> genericReocrd.put("ts", r.ts) >>>>> genericReocrd >>>>> } >>>>> stream.addSink { r => >>>>> println(s"In Sink $r") //getting this line >>>>> streamingSink >>>>> } >>>>> env.execute() >>>>> } >>>>> >>>>> Cheers >>>>> Avi >>>>> >>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>>> >>>>>> Sorry, previously I got confused and I assumed you were using Flink's >>>>>> StreamingFileSink. >>>>>> >>>>>> Could you try to use Flink's Avro - Parquet writer? >>>>>> >>>>>> StreamingFileSink.forBulkFormat( >>>>>> Path...(MY_PATH), >>>>>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>>>>> .build() >>>>>> >>>>>> >>>>>> Cheers, >>>>>> Kostas >>>>>> >>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks. >>>>>>> yes, the *env.execute* is called and enabled checkpoints >>>>>>> I think the problem is where to place the *writer.close *to flush >>>>>>> the cache >>>>>>> If I'll place on the sink after the write event e.g >>>>>>> addSink{ >>>>>>> writer.write >>>>>>> writer.close >>>>>>> } >>>>>>> in this case only the first record will be included in the file but >>>>>>> not the rest of the stream. >>>>>>> >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Hi again Avi, >>>>>>>> >>>>>>>> In the first example that you posted (the one with the Kafka >>>>>>>> source), do you call env.execute()? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kostas >>>>>>>> >>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>>>>> k.klou...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi Avi, >>>>>>>>> >>>>>>>>> In the last snippet that you posted, you have not activated >>>>>>>>> checkpoints. >>>>>>>>> >>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce >>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where >>>>>>>>> the part file is rolled upon reception of a checkpoint and the >>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets >>>>>>>>> completed >>>>>>>>> successfully. >>>>>>>>> >>>>>>>>> Could you please enable checkpointing and make sure that the job >>>>>>>>> runs long enough for at least some checkpoints to be completed? >>>>>>>>> >>>>>>>>> Thanks a lot, >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Checkout this little App. you can see that the file is created >>>>>>>>>> but no data is written. even for a single record >>>>>>>>>> >>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>>>>> import org.apache.avro.Schema >>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>>>>> import >>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>>>> import org.apache.hadoop.fs.Path >>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>>>>> import scala.io.Source >>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>> >>>>>>>>>> object Tester extends App { >>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>> def now = System.currentTimeMillis() >>>>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>>>> val schemaString = >>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>> val config = ParquetWriterConfig() >>>>>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>>>>> genericReocrd.put("name", "test_b") >>>>>>>>>> genericReocrd.put("code", "NoError") >>>>>>>>>> genericReocrd.put("ts", 100L) >>>>>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>> .withSchema(schema) >>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>> .withValidation(config.validating) >>>>>>>>>> .build() >>>>>>>>>> >>>>>>>>>> writer.write(genericReocrd) >>>>>>>>>> stream.addSink { r => >>>>>>>>>> println(s"In Sink $r") >>>>>>>>>> writer.write(r) >>>>>>>>>> } >>>>>>>>>> env.execute() >>>>>>>>>> // writer.close() >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Can you try closing the writer? >>>>>>>>>>> >>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() >>>>>>>>>>> in snapshot()( since you are checkpointing hence this method will >>>>>>>>>>> be called) >>>>>>>>>>> >>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi < >>>>>>>>>>> avi.l...@bluevoyant.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Rafi, >>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will >>>>>>>>>>>> try to add it as you suggested. however it seems that the messages >>>>>>>>>>>> I >>>>>>>>>>>> repeating in the stream over and over even if I am pushing single >>>>>>>>>>>> message >>>>>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>>>>> >>>>>>>>>>>> Cheers >>>>>>>>>>>> Avi >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch < >>>>>>>>>>>> rafi.ar...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Avi, >>>>>>>>>>>>> >>>>>>>>>>>>> I can't see the part where you use >>>>>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>>>>> If this part in not set properly, it's possible that >>>>>>>>>>>>> watermarks are not sent and nothing will be written to your Sink. >>>>>>>>>>>>> >>>>>>>>>>>>> See here for more details: >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>>>>> >>>>>>>>>>>>> Hope this helps, >>>>>>>>>>>>> Rafi >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>>>>>>>> however it >>>>>>>>>>>>>> seems like the stream is repeating itself like endless loop and >>>>>>>>>>>>>> the parquet >>>>>>>>>>>>>> file is not written . can someone please help me with this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>>>>> private val schemaString = >>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>>>>> private val compressionCodecName = >>>>>>>>>>>>>> CompressionCodecName.SNAPPY >>>>>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>>>> .build() >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) >>>>>>>>>>>>>> extends SinkFunction[GenericRecord] { >>>>>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>>>>> println(s"ADDING TO File : $value") // getting this >>>>>>>>>>>>>> output >>>>>>>>>>>>>> writer.write(value) //the output is not written to the >>>>>>>>>>>>>> file >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> //main app >>>>>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>>>>> >>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>>>>> >>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>>>>> >>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in >>>>>>>>>>>>>> a loop* >>>>>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>>>>> * genericReocrd* >>>>>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your help >>>>>>>>>>>>>> Avi >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks, >>>>>>>>>>> Vipul >>>>>>>>>>> >>>>>>>>>>