Thanks a lot Kostas, but the file not created . what am I doing wrong? BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment def now = System.currentTimeMillis() val path = new Path(s"test-$now.parquet") val schema: Schema = new Schema.Parser().parse(schemaString) val streamingSink = StreamingFileSink.forBulkFormat( path, ParquetAvroWriters.forGenericRecord(schema)) .build() env.enableCheckpointing(100) val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r => val genericReocrd: GenericRecord = new GenericData.Record(schema) genericReocrd.put("name", r.name) genericReocrd.put("code", r.code.asString) genericReocrd.put("ts", r.ts) genericReocrd } stream.addSink { r => println(s"In Sink $r") //getting this line streamingSink } env.execute() } Cheers Avi On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Sorry, previously I got confused and I assumed you were using Flink's > StreamingFileSink. > > Could you try to use Flink's Avro - Parquet writer? > > StreamingFileSink.forBulkFormat( > Path...(MY_PATH), > ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) > .build() > > > Cheers, > Kostas > > On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Thanks. >> yes, the *env.execute* is called and enabled checkpoints >> I think the problem is where to place the *writer.close *to flush the >> cache >> If I'll place on the sink after the write event e.g >> addSink{ >> writer.write >> writer.close >> } >> in this case only the first record will be included in the file but not >> the rest of the stream. >> >> >> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi again Avi, >>> >>> In the first example that you posted (the one with the Kafka source), do >>> you call env.execute()? >>> >>> Cheers, >>> Kostas >>> >>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >>> k.klou...@data-artisans.com> wrote: >>> >>>> Hi Avi, >>>> >>>> In the last snippet that you posted, you have not activated checkpoints. >>>> >>>> Checkpoints are needed for the StreamingFileSink to produce results, >>>> especially in the case of BulkWriters (like Parquet) where >>>> the part file is rolled upon reception of a checkpoint and the part is >>>> finalised (i.e. "committed") when the checkpoint gets completed >>>> successfully. >>>> >>>> Could you please enable checkpointing and make sure that the job runs >>>> long enough for at least some checkpoints to be completed? >>>> >>>> Thanks a lot, >>>> Kostas >>>> >>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Checkout this little App. you can see that the file is created but no >>>>> data is written. even for a single record >>>>> >>>>> import io.eels.component.parquet.ParquetWriterConfig >>>>> import org.apache.avro.Schema >>>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>>> import org.apache.hadoop.fs.Path >>>>> import org.apache.parquet.avro.AvroParquetWriter >>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>>> import scala.io.Source >>>>> import org.apache.flink.streaming.api.scala._ >>>>> >>>>> object Tester extends App { >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> def now = System.currentTimeMillis() >>>>> val path = new Path(s"test-$now.parquet") >>>>> val schemaString = >>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>>> val config = ParquetWriterConfig() >>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>>> genericReocrd.put("name", "test_b") >>>>> genericReocrd.put("code", "NoError") >>>>> genericReocrd.put("ts", 100L) >>>>> val stream = env.fromElements(genericReocrd) >>>>> val writer: ParquetWriter[GenericRecord] = >>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>> .withSchema(schema) >>>>> .withCompressionCodec(compressionCodecName) >>>>> .withPageSize(config.pageSize) >>>>> .withRowGroupSize(config.blockSize) >>>>> .withDictionaryEncoding(config.enableDictionary) >>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>> .withValidation(config.validating) >>>>> .build() >>>>> >>>>> writer.write(genericReocrd) >>>>> stream.addSink { r => >>>>> println(s"In Sink $r") >>>>> writer.write(r) >>>>> } >>>>> env.execute() >>>>> // writer.close() >>>>> } >>>>> >>>>> >>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> >>>>> wrote: >>>>> >>>>>> Can you try closing the writer? >>>>>> >>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>>>>> snapshot()( since you are checkpointing hence this method will be called) >>>>>> >>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Rafi, >>>>>>> I am actually not using assignTimestampsAndWatermarks , I will try >>>>>>> to add it as you suggested. however it seems that the messages I >>>>>>> repeating >>>>>>> in the stream over and over even if I am pushing single message >>>>>>> manually to >>>>>>> the queue, that message will repeat infinity >>>>>>> >>>>>>> Cheers >>>>>>> Avi >>>>>>> >>>>>>> >>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Avi, >>>>>>>> >>>>>>>> I can't see the part where you use assignTimestampsAndWatermarks. >>>>>>>> If this part in not set properly, it's possible that watermarks are >>>>>>>> not sent and nothing will be written to your Sink. >>>>>>>> >>>>>>>> See here for more details: >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>>> >>>>>>>> Hope this helps, >>>>>>>> Rafi >>>>>>>> >>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>>> however it >>>>>>>>> seems like the stream is repeating itself like endless loop and the >>>>>>>>> parquet >>>>>>>>> file is not written . can someone please help me with this? >>>>>>>>> >>>>>>>>> object ParquetSinkWriter{ >>>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>>> private val schemaString = >>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>>> private val avroSchema: Schema = new >>>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>>> private val config = ParquetWriterConfig() >>>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>>> .withSchema(avroSchema) >>>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>>> .withPageSize(config.pageSize) >>>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>>> .withValidation(config.validating) >>>>>>>>> .build() >>>>>>>>> } >>>>>>>>> >>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>>>>> SinkFunction[GenericRecord] { >>>>>>>>> import ParquetSinkWriter._ >>>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>>>>> writer.write(value) //the output is not written to the file >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> //main app >>>>>>>>> object StreamingJob extends App { >>>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>> env.enableCheckpointing(500) >>>>>>>>> >>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>>> >>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>>> val backend: StateBackend = new >>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>>> env.setStateBackend(backend) >>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume >>>>>>>>> from kafka)* >>>>>>>>> *stream2.map { r =>* >>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a >>>>>>>>> loop* >>>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>>> GenericData.Record(schema)* >>>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>>> * genericReocrd* >>>>>>>>> * }.addSink(writer) * >>>>>>>>> >>>>>>>>> Thanks for your help >>>>>>>>> Avi >>>>>>>>> >>>>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Vipul >>>>>> >>>>>