Can you try closing the writer? AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)
On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Thanks Rafi, > I am actually not using assignTimestampsAndWatermarks , I will try to add > it as you suggested. however it seems that the messages I repeating in the > stream over and over even if I am pushing single message manually to the > queue, that message will repeat infinity > > Cheers > Avi > > > On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Avi, >> >> I can't see the part where you use assignTimestampsAndWatermarks. >> If this part in not set properly, it's possible that watermarks are not >> sent and nothing will be written to your Sink. >> >> See here for more details: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >> >> Hope this helps, >> Rafi >> >> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: >> >>> Hi, >>> >>> I am trying to implement Parquet Writer as SinkFunction. The pipeline >>> consists of kafka as source and parquet file as a sink however it seems >>> like the stream is repeating itself like endless loop and the parquet file >>> is not written . can someone please help me with this? >>> >>> object ParquetSinkWriter{ >>> private val path = new Path("tmp/pfile") >>> private val schemaString = >>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >>> private val avroSchema: Schema = new >>> Schema.Parser().parse(schemaString) >>> private val compressionCodecName = CompressionCodecName.SNAPPY >>> private val config = ParquetWriterConfig() >>> val writer: ParquetWriter[GenericRecord] = >>> AvroParquetWriter.builder[GenericRecord](path) >>> .withSchema(avroSchema) >>> .withCompressionCodec(compressionCodecName) >>> .withPageSize(config.pageSize) >>> .withRowGroupSize(config.blockSize) >>> .withDictionaryEncoding(config.enableDictionary) >>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>> .withValidation(config.validating) >>> .build() >>> } >>> >>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >>> SinkFunction[GenericRecord] { >>> import ParquetSinkWriter._ >>> override def invoke(value: GenericRecord): Unit = { >>> println(s"ADDING TO File : $value") // getting this output >>> writer.write(value) //the output is not written to the file >>> } >>> } >>> >>> //main app >>> object StreamingJob extends App { >>> implicit val env: StreamExecutionEnvironment = >>> StreamExecutionEnvironment.getExecutionEnvironment >>> env.enableCheckpointing(500) >>> >>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >>> env.getCheckpointConfig.setCheckpointTimeout(600) >>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >>> Time.seconds(3), Time.seconds(3))) >>> val backend: StateBackend = new >>> RocksDBStateBackend("file:///tmp/rocksdb", true) >>> env.setStateBackend(backend) >>> val writer = new ParquetSinkWriter(outputPath, schema) >>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume from >>> kafka)* >>> *stream2.map { r =>* >>> * println(s"MAPPING $r") //this output keeps repeating in a loop* >>> * val genericReocrd: GenericRecord = new GenericData.Record(schema)* >>> * genericReocrd.put("qname", r.qname)* >>> * genericReocrd.put("rcode", r.rcode)* >>> * genericReocrd.put("ts", r.ts)* >>> * genericReocrd* >>> * }.addSink(writer) * >>> >>> Thanks for your help >>> Avi >>> >>> -- Thanks, Vipul