Thanks Rafi,
I am actually not using assignTimestampsAndWatermarks , I will try to add
it as you suggested. however it seems that the messages I repeating in the
stream over and over even if I am pushing single message manually to the
queue, that message will repeat infinity

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Avi,
>
> I can't see the part where you use  assignTimestampsAndWatermarks.
> If this part in not set properly, it's possible that watermarks are not
> sent and nothing will be written to your Sink.
>
> See here for more details:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> Hope this helps,
> Rafi
>
> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote:
>
>> Hi,
>>
>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>> consists of kafka as source and parquet file as a sink however it seems
>> like the stream is repeating itself like endless loop and the parquet file
>> is not written . can someone please help me with this?
>>
>> object ParquetSinkWriter{
>>   private val path = new Path("tmp/pfile")
>>   private val schemaString =
>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>   private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>   private   val config = ParquetWriterConfig()
>>   val writer: ParquetWriter[GenericRecord] =
>> AvroParquetWriter.builder[GenericRecord](path)
>>     .withSchema(avroSchema)
>>     .withCompressionCodec(compressionCodecName)
>>     .withPageSize(config.pageSize)
>>     .withRowGroupSize(config.blockSize)
>>     .withDictionaryEncoding(config.enableDictionary)
>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>     .withValidation(config.validating)
>>     .build()
>> }
>>
>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>> SinkFunction[GenericRecord] {
>>   import ParquetSinkWriter._
>>   override def invoke(value: GenericRecord): Unit = {
>>     println(s"ADDING TO File : $value") // getting this output
>>     writer.write(value) //the output is not written to the file
>>   }
>> }
>>
>> //main app
>> object StreamingJob extends App  {
>>  implicit val env: StreamExecutionEnvironment =
>> StreamExecutionEnvironment.getExecutionEnvironment
>>   env.enableCheckpointing(500)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>> Time.seconds(3), Time.seconds(3)))
>>   val backend: StateBackend = new
>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>   env.setStateBackend(backend)
>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>> kafka)*
>> *stream2.map { r =>*
>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>> *    val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>> *    genericReocrd.put("qname", r.qname)*
>> *    genericReocrd.put("rcode", r.rcode)*
>> *    genericReocrd.put("ts", r.ts)*
>> *    genericReocrd*
>> *  }.addSink(writer) *
>>
>> Thanks for your help
>> Avi
>>
>>

Reply via email to