Thanks looks good. Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink> ? something like :
val bucketingSink = new BucketingSink[String]("/base/path") bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema)) bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > And for a Java example which is actually similar to your pipeline, > you can check the ParquetStreamingFileSinkITCase. > > > > On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Avi, >> >> At a first glance I am not seeing anything wrong with your code. >> Did you verify that there are elements flowing in your pipeline and that >> checkpoints are actually completed? >> And also can you check the logs at Job and Task Manager for anything >> suspicious? >> >> Unfortunately, we do not allow specifying encoding and other parameters >> to your writer, which is an omission >> on our part and this should be fixed. Could you open a JIRA for that? >> >> If you want to know more about Flink's Parquet-Avro writer, feel free to >> have a look at the ParquetAvroWriters >> class. >> >> Cheers, >> Kostas >> >> >> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Thanks a lot Kostas, but the file not created . what am I doing wrong? >>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer? >>> >>> object Tester extends App { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> def now = System.currentTimeMillis() >>> val path = new Path(s"test-$now.parquet") >>> val schema: Schema = new Schema.Parser().parse(schemaString) >>> val streamingSink = StreamingFileSink.forBulkFormat( path, >>> ParquetAvroWriters.forGenericRecord(schema)) >>> .build() >>> env.enableCheckpointing(100) >>> val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => >>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>> genericReocrd.put("name", r.name) >>> genericReocrd.put("code", r.code.asString) >>> genericReocrd.put("ts", r.ts) >>> genericReocrd >>> } >>> stream.addSink { r => >>> println(s"In Sink $r") //getting this line >>> streamingSink >>> } >>> env.execute() >>> } >>> >>> Cheers >>> Avi >>> >>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas < >>> k.klou...@data-artisans.com> wrote: >>> >>>> >>>> Sorry, previously I got confused and I assumed you were using Flink's >>>> StreamingFileSink. >>>> >>>> Could you try to use Flink's Avro - Parquet writer? >>>> >>>> StreamingFileSink.forBulkFormat( >>>> Path...(MY_PATH), >>>> ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) >>>> .build() >>>> >>>> >>>> Cheers, >>>> Kostas >>>> >>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Thanks. >>>>> yes, the *env.execute* is called and enabled checkpoints >>>>> I think the problem is where to place the *writer.close *to flush the >>>>> cache >>>>> If I'll place on the sink after the write event e.g >>>>> addSink{ >>>>> writer.write >>>>> writer.close >>>>> } >>>>> in this case only the first record will be included in the file but >>>>> not the rest of the stream. >>>>> >>>>> >>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> wrote: >>>>> >>>>>> Hi again Avi, >>>>>> >>>>>> In the first example that you posted (the one with the Kafka source), >>>>>> do you call env.execute()? >>>>>> >>>>>> Cheers, >>>>>> Kostas >>>>>> >>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>>>>> k.klou...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi Avi, >>>>>>> >>>>>>> In the last snippet that you posted, you have not activated >>>>>>> checkpoints. >>>>>>> >>>>>>> Checkpoints are needed for the StreamingFileSink to produce results, >>>>>>> especially in the case of BulkWriters (like Parquet) where >>>>>>> the part file is rolled upon reception of a checkpoint and the part >>>>>>> is finalised (i.e. "committed") when the checkpoint gets completed >>>>>>> successfully. >>>>>>> >>>>>>> Could you please enable checkpointing and make sure that the job >>>>>>> runs long enough for at least some checkpoints to be completed? >>>>>>> >>>>>>> Thanks a lot, >>>>>>> Kostas >>>>>>> >>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Checkout this little App. you can see that the file is created but >>>>>>>> no data is written. even for a single record >>>>>>>> >>>>>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>>>>> import org.apache.avro.Schema >>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>>>>> import org.apache.hadoop.fs.Path >>>>>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>>>>> import scala.io.Source >>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>> >>>>>>>> object Tester extends App { >>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> def now = System.currentTimeMillis() >>>>>>>> val path = new Path(s"test-$now.parquet") >>>>>>>> val schemaString = >>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>> val config = ParquetWriterConfig() >>>>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>>>>> genericReocrd.put("name", "test_b") >>>>>>>> genericReocrd.put("code", "NoError") >>>>>>>> genericReocrd.put("ts", 100L) >>>>>>>> val stream = env.fromElements(genericReocrd) >>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>> .withSchema(schema) >>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>> .withPageSize(config.pageSize) >>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>> .withValidation(config.validating) >>>>>>>> .build() >>>>>>>> >>>>>>>> writer.write(genericReocrd) >>>>>>>> stream.addSink { r => >>>>>>>> println(s"In Sink $r") >>>>>>>> writer.write(r) >>>>>>>> } >>>>>>>> env.execute() >>>>>>>> // writer.close() >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Can you try closing the writer? >>>>>>>>> >>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>>>>>>>> snapshot()( since you are checkpointing hence this method will be >>>>>>>>> called) >>>>>>>>> >>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Rafi, >>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will >>>>>>>>>> try to add it as you suggested. however it seems that the messages I >>>>>>>>>> repeating in the stream over and over even if I am pushing single >>>>>>>>>> message >>>>>>>>>> manually to the queue, that message will repeat infinity >>>>>>>>>> >>>>>>>>>> Cheers >>>>>>>>>> Avi >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Avi, >>>>>>>>>>> >>>>>>>>>>> I can't see the part where you use >>>>>>>>>>> assignTimestampsAndWatermarks. >>>>>>>>>>> If this part in not set properly, it's possible that watermarks >>>>>>>>>>> are not sent and nothing will be written to your Sink. >>>>>>>>>>> >>>>>>>>>>> See here for more details: >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>>>>> >>>>>>>>>>> Hope this helps, >>>>>>>>>>> Rafi >>>>>>>>>>> >>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>>>>>> however it >>>>>>>>>>>> seems like the stream is repeating itself like endless loop and >>>>>>>>>>>> the parquet >>>>>>>>>>>> file is not written . can someone please help me with this? >>>>>>>>>>>> >>>>>>>>>>>> object ParquetSinkWriter{ >>>>>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>>>>> private val schemaString = >>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>>>>> private val avroSchema: Schema = new >>>>>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>>>>> .withSchema(avroSchema) >>>>>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>>>>> .withValidation(config.validating) >>>>>>>>>>>> .build() >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>>>>>>>> SinkFunction[GenericRecord] { >>>>>>>>>>>> import ParquetSinkWriter._ >>>>>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>>>>>>>> writer.write(value) //the output is not written to the >>>>>>>>>>>> file >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> //main app >>>>>>>>>>>> object StreamingJob extends App { >>>>>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>> env.enableCheckpointing(500) >>>>>>>>>>>> >>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>>>>> >>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>>>>> >>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>>>>> val backend: StateBackend = new >>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>>>>> env.setStateBackend(backend) >>>>>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>>>>> *val stream2: DataStream[DnsRequest] = >>>>>>>>>>>> env.addSource(//consume from kafka)* >>>>>>>>>>>> *stream2.map { r =>* >>>>>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a >>>>>>>>>>>> loop* >>>>>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>>>>> GenericData.Record(schema)* >>>>>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>>>>> * genericReocrd* >>>>>>>>>>>> * }.addSink(writer) * >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your help >>>>>>>>>>>> Avi >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks, >>>>>>>>> Vipul >>>>>>>>> >>>>>>>>