Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in
snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Thanks Rafi,
> I am actually not using assignTimestampsAndWatermarks , I will try to add
> it as you suggested. however it seems that the messages I repeating in the
> stream over and over even if I am pushing single message manually to the
> queue, that message will repeat infinity
>
> Cheers
> Avi
>
>
> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Avi,
>>
>> I can't see the part where you use  assignTimestampsAndWatermarks.
>> If this part in not set properly, it's possible that watermarks are not
>> sent and nothing will be written to your Sink.
>>
>> See here for more details:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>
>> Hope this helps,
>> Rafi
>>
>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote:
>>
>>> Hi,
>>>
>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>> consists of kafka as source and parquet file as a sink however it seems
>>> like the stream is repeating itself like endless loop and the parquet file
>>> is not written . can someone please help me with this?
>>>
>>> object ParquetSinkWriter{
>>>   private val path = new Path("tmp/pfile")
>>>   private val schemaString =
>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>   private val avroSchema: Schema = new
>>> Schema.Parser().parse(schemaString)
>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>   private   val config = ParquetWriterConfig()
>>>   val writer: ParquetWriter[GenericRecord] =
>>> AvroParquetWriter.builder[GenericRecord](path)
>>>     .withSchema(avroSchema)
>>>     .withCompressionCodec(compressionCodecName)
>>>     .withPageSize(config.pageSize)
>>>     .withRowGroupSize(config.blockSize)
>>>     .withDictionaryEncoding(config.enableDictionary)
>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>     .withValidation(config.validating)
>>>     .build()
>>> }
>>>
>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>> SinkFunction[GenericRecord] {
>>>   import ParquetSinkWriter._
>>>   override def invoke(value: GenericRecord): Unit = {
>>>     println(s"ADDING TO File : $value") // getting this output
>>>     writer.write(value) //the output is not written to the file
>>>   }
>>> }
>>>
>>> //main app
>>> object StreamingJob extends App  {
>>>  implicit val env: StreamExecutionEnvironment =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.enableCheckpointing(500)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>> Time.seconds(3), Time.seconds(3)))
>>>   val backend: StateBackend = new
>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>   env.setStateBackend(backend)
>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>>> kafka)*
>>> *stream2.map { r =>*
>>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>>> *    val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>>> *    genericReocrd.put("qname", r.qname)*
>>> *    genericReocrd.put("rcode", r.rcode)*
>>> *    genericReocrd.put("ts", r.ts)*
>>> *    genericReocrd*
>>> *  }.addSink(writer) *
>>>
>>> Thanks for your help
>>> Avi
>>>
>>>

-- 
Thanks,
Vipul

Reply via email to