Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.
Could you try to use Flink's Avro - Parquet writer? StreamingFileSink.forBulkFormat( Path...(MY_PATH), ParquetAvroWriters.forGenericRecord(MY_SCHEMA)) .build() Cheers, Kostas On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Thanks. > yes, the *env.execute* is called and enabled checkpoints > I think the problem is where to place the *writer.close *to flush the > cache > If I'll place on the sink after the write event e.g > addSink{ > writer.write > writer.close > } > in this case only the first record will be included in the file but not > the rest of the stream. > > > On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi again Avi, >> >> In the first example that you posted (the one with the Kafka source), do >> you call env.execute()? >> >> Cheers, >> Kostas >> >> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi Avi, >>> >>> In the last snippet that you posted, you have not activated checkpoints. >>> >>> Checkpoints are needed for the StreamingFileSink to produce results, >>> especially in the case of BulkWriters (like Parquet) where >>> the part file is rolled upon reception of a checkpoint and the part is >>> finalised (i.e. "committed") when the checkpoint gets completed >>> successfully. >>> >>> Could you please enable checkpointing and make sure that the job runs >>> long enough for at least some checkpoints to be completed? >>> >>> Thanks a lot, >>> Kostas >>> >>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> >>> wrote: >>> >>>> Checkout this little App. you can see that the file is created but no >>>> data is written. even for a single record >>>> >>>> import io.eels.component.parquet.ParquetWriterConfig >>>> import org.apache.avro.Schema >>>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>>> import org.apache.hadoop.fs.Path >>>> import org.apache.parquet.avro.AvroParquetWriter >>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>>> import scala.io.Source >>>> import org.apache.flink.streaming.api.scala._ >>>> >>>> object Tester extends App { >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> def now = System.currentTimeMillis() >>>> val path = new Path(s"test-$now.parquet") >>>> val schemaString = >>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>>> val schema: Schema = new Schema.Parser().parse(schemaString) >>>> val compressionCodecName = CompressionCodecName.SNAPPY >>>> val config = ParquetWriterConfig() >>>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>>> genericReocrd.put("name", "test_b") >>>> genericReocrd.put("code", "NoError") >>>> genericReocrd.put("ts", 100L) >>>> val stream = env.fromElements(genericReocrd) >>>> val writer: ParquetWriter[GenericRecord] = >>>> AvroParquetWriter.builder[GenericRecord](path) >>>> .withSchema(schema) >>>> .withCompressionCodec(compressionCodecName) >>>> .withPageSize(config.pageSize) >>>> .withRowGroupSize(config.blockSize) >>>> .withDictionaryEncoding(config.enableDictionary) >>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>> .withValidation(config.validating) >>>> .build() >>>> >>>> writer.write(genericReocrd) >>>> stream.addSink { r => >>>> println(s"In Sink $r") >>>> writer.write(r) >>>> } >>>> env.execute() >>>> // writer.close() >>>> } >>>> >>>> >>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote: >>>> >>>>> Can you try closing the writer? >>>>> >>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>>>> snapshot()( since you are checkpointing hence this method will be called) >>>>> >>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>>>> wrote: >>>>> >>>>>> Thanks Rafi, >>>>>> I am actually not using assignTimestampsAndWatermarks , I will try >>>>>> to add it as you suggested. however it seems that the messages I >>>>>> repeating >>>>>> in the stream over and over even if I am pushing single message manually >>>>>> to >>>>>> the queue, that message will repeat infinity >>>>>> >>>>>> Cheers >>>>>> Avi >>>>>> >>>>>> >>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Avi, >>>>>>> >>>>>>> I can't see the part where you use assignTimestampsAndWatermarks. >>>>>>> If this part in not set properly, it's possible that watermarks are >>>>>>> not sent and nothing will be written to your Sink. >>>>>>> >>>>>>> See here for more details: >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>>> >>>>>>> Hope this helps, >>>>>>> Rafi >>>>>>> >>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>>> pipeline consists of kafka as source and parquet file as a sink >>>>>>>> however it >>>>>>>> seems like the stream is repeating itself like endless loop and the >>>>>>>> parquet >>>>>>>> file is not written . can someone please help me with this? >>>>>>>> >>>>>>>> object ParquetSinkWriter{ >>>>>>>> private val path = new Path("tmp/pfile") >>>>>>>> private val schemaString = >>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>>> private val avroSchema: Schema = new >>>>>>>> Schema.Parser().parse(schemaString) >>>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>>> private val config = ParquetWriterConfig() >>>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>>> .withSchema(avroSchema) >>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>> .withPageSize(config.pageSize) >>>>>>>> .withRowGroupSize(config.blockSize) >>>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>> .withValidation(config.validating) >>>>>>>> .build() >>>>>>>> } >>>>>>>> >>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>>>> SinkFunction[GenericRecord] { >>>>>>>> import ParquetSinkWriter._ >>>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>>>> writer.write(value) //the output is not written to the file >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> //main app >>>>>>>> object StreamingJob extends App { >>>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>> env.enableCheckpointing(500) >>>>>>>> >>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>>> >>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>>> val backend: StateBackend = new >>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>>> env.setStateBackend(backend) >>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume >>>>>>>> from kafka)* >>>>>>>> *stream2.map { r =>* >>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a loop* >>>>>>>> * val genericReocrd: GenericRecord = new >>>>>>>> GenericData.Record(schema)* >>>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>>> * genericReocrd* >>>>>>>> * }.addSink(writer) * >>>>>>>> >>>>>>>> Thanks for your help >>>>>>>> Avi >>>>>>>> >>>>>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Vipul >>>>> >>>>