Thanks. yes, the *env.execute* is called and enabled checkpoints I think the problem is where to place the *writer.close *to flush the cache If I'll place on the sink after the write event e.g addSink{ writer.write writer.close } in this case only the first record will be included in the file but not the rest of the stream.
On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi again Avi, > > In the first example that you posted (the one with the Kafka source), do > you call env.execute()? > > Cheers, > Kostas > > On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Avi, >> >> In the last snippet that you posted, you have not activated checkpoints. >> >> Checkpoints are needed for the StreamingFileSink to produce results, >> especially in the case of BulkWriters (like Parquet) where >> the part file is rolled upon reception of a checkpoint and the part is >> finalised (i.e. "committed") when the checkpoint gets completed >> successfully. >> >> Could you please enable checkpointing and make sure that the job runs >> long enough for at least some checkpoints to be completed? >> >> Thanks a lot, >> Kostas >> >> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Checkout this little App. you can see that the file is created but no >>> data is written. even for a single record >>> >>> import io.eels.component.parquet.ParquetWriterConfig >>> import org.apache.avro.Schema >>> import org.apache.avro.generic.{ GenericData, GenericRecord } >>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >>> import org.apache.hadoop.fs.Path >>> import org.apache.parquet.avro.AvroParquetWriter >>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter } >>> import org.apache.parquet.hadoop.metadata.CompressionCodecName >>> import scala.io.Source >>> import org.apache.flink.streaming.api.scala._ >>> >>> object Tester extends App { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> def now = System.currentTimeMillis() >>> val path = new Path(s"test-$now.parquet") >>> val schemaString = >>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString >>> val schema: Schema = new Schema.Parser().parse(schemaString) >>> val compressionCodecName = CompressionCodecName.SNAPPY >>> val config = ParquetWriterConfig() >>> val genericReocrd: GenericRecord = new GenericData.Record(schema) >>> genericReocrd.put("name", "test_b") >>> genericReocrd.put("code", "NoError") >>> genericReocrd.put("ts", 100L) >>> val stream = env.fromElements(genericReocrd) >>> val writer: ParquetWriter[GenericRecord] = >>> AvroParquetWriter.builder[GenericRecord](path) >>> .withSchema(schema) >>> .withCompressionCodec(compressionCodecName) >>> .withPageSize(config.pageSize) >>> .withRowGroupSize(config.blockSize) >>> .withDictionaryEncoding(config.enableDictionary) >>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>> .withValidation(config.validating) >>> .build() >>> >>> writer.write(genericReocrd) >>> stream.addSink { r => >>> println(s"In Sink $r") >>> writer.write(r) >>> } >>> env.execute() >>> // writer.close() >>> } >>> >>> >>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote: >>> >>>> Can you try closing the writer? >>>> >>>> AvroParquetWriter has an internal buffer. Try doing a .close() in >>>> snapshot()( since you are checkpointing hence this method will be called) >>>> >>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Thanks Rafi, >>>>> I am actually not using assignTimestampsAndWatermarks , I will try to >>>>> add it as you suggested. however it seems that the messages I repeating in >>>>> the stream over and over even if I am pushing single message manually to >>>>> the queue, that message will repeat infinity >>>>> >>>>> Cheers >>>>> Avi >>>>> >>>>> >>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Avi, >>>>>> >>>>>> I can't see the part where you use assignTimestampsAndWatermarks. >>>>>> If this part in not set properly, it's possible that watermarks are >>>>>> not sent and nothing will be written to your Sink. >>>>>> >>>>>> See here for more details: >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >>>>>> >>>>>> Hope this helps, >>>>>> Rafi >>>>>> >>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am trying to implement Parquet Writer as SinkFunction. The >>>>>>> pipeline consists of kafka as source and parquet file as a sink however >>>>>>> it >>>>>>> seems like the stream is repeating itself like endless loop and the >>>>>>> parquet >>>>>>> file is not written . can someone please help me with this? >>>>>>> >>>>>>> object ParquetSinkWriter{ >>>>>>> private val path = new Path("tmp/pfile") >>>>>>> private val schemaString = >>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>>>>>> private val avroSchema: Schema = new >>>>>>> Schema.Parser().parse(schemaString) >>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY >>>>>>> private val config = ParquetWriterConfig() >>>>>>> val writer: ParquetWriter[GenericRecord] = >>>>>>> AvroParquetWriter.builder[GenericRecord](path) >>>>>>> .withSchema(avroSchema) >>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>> .withPageSize(config.pageSize) >>>>>>> .withRowGroupSize(config.blockSize) >>>>>>> .withDictionaryEncoding(config.enableDictionary) >>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>> .withValidation(config.validating) >>>>>>> .build() >>>>>>> } >>>>>>> >>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>>>>>> SinkFunction[GenericRecord] { >>>>>>> import ParquetSinkWriter._ >>>>>>> override def invoke(value: GenericRecord): Unit = { >>>>>>> println(s"ADDING TO File : $value") // getting this output >>>>>>> writer.write(value) //the output is not written to the file >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> //main app >>>>>>> object StreamingJob extends App { >>>>>>> implicit val env: StreamExecutionEnvironment = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> env.enableCheckpointing(500) >>>>>>> >>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>>>>>> >>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600) >>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>>>>>> Time.seconds(3), Time.seconds(3))) >>>>>>> val backend: StateBackend = new >>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>>>>>> env.setStateBackend(backend) >>>>>>> val writer = new ParquetSinkWriter(outputPath, schema) >>>>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume >>>>>>> from kafka)* >>>>>>> *stream2.map { r =>* >>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a loop* >>>>>>> * val genericReocrd: GenericRecord = new >>>>>>> GenericData.Record(schema)* >>>>>>> * genericReocrd.put("qname", r.qname)* >>>>>>> * genericReocrd.put("rcode", r.rcode)* >>>>>>> * genericReocrd.put("ts", r.ts)* >>>>>>> * genericReocrd* >>>>>>> * }.addSink(writer) * >>>>>>> >>>>>>> Thanks for your help >>>>>>> Avi >>>>>>> >>>>>>> >>>> >>>> -- >>>> Thanks, >>>> Vipul >>>> >>>