Thanks Rafi, I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity
Cheers Avi On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Avi, > > I can't see the part where you use assignTimestampsAndWatermarks. > If this part in not set properly, it's possible that watermarks are not > sent and nothing will be written to your Sink. > > See here for more details: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > Hope this helps, > Rafi > > On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote: > >> Hi, >> >> I am trying to implement Parquet Writer as SinkFunction. The pipeline >> consists of kafka as source and parquet file as a sink however it seems >> like the stream is repeating itself like endless loop and the parquet file >> is not written . can someone please help me with this? >> >> object ParquetSinkWriter{ >> private val path = new Path("tmp/pfile") >> private val schemaString = >> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString >> private val avroSchema: Schema = new Schema.Parser().parse(schemaString) >> private val compressionCodecName = CompressionCodecName.SNAPPY >> private val config = ParquetWriterConfig() >> val writer: ParquetWriter[GenericRecord] = >> AvroParquetWriter.builder[GenericRecord](path) >> .withSchema(avroSchema) >> .withCompressionCodec(compressionCodecName) >> .withPageSize(config.pageSize) >> .withRowGroupSize(config.blockSize) >> .withDictionaryEncoding(config.enableDictionary) >> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >> .withValidation(config.validating) >> .build() >> } >> >> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends >> SinkFunction[GenericRecord] { >> import ParquetSinkWriter._ >> override def invoke(value: GenericRecord): Unit = { >> println(s"ADDING TO File : $value") // getting this output >> writer.write(value) //the output is not written to the file >> } >> } >> >> //main app >> object StreamingJob extends App { >> implicit val env: StreamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment >> env.enableCheckpointing(500) >> >> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >> >> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) >> env.getCheckpointConfig.setCheckpointTimeout(600) >> env.getCheckpointConfig.setFailOnCheckpointingErrors(false) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> env.setRestartStrategy(RestartStrategies.failureRateRestart(2, >> Time.seconds(3), Time.seconds(3))) >> val backend: StateBackend = new >> RocksDBStateBackend("file:///tmp/rocksdb", true) >> env.setStateBackend(backend) >> val writer = new ParquetSinkWriter(outputPath, schema) >> *val stream2: DataStream[DnsRequest] = env.addSource(//consume from >> kafka)* >> *stream2.map { r =>* >> * println(s"MAPPING $r") //this output keeps repeating in a loop* >> * val genericReocrd: GenericRecord = new GenericData.Record(schema)* >> * genericReocrd.put("qname", r.qname)* >> * genericReocrd.put("rcode", r.rcode)* >> * genericReocrd.put("ts", r.ts)* >> * genericReocrd* >> * }.addSink(writer) * >> >> Thanks for your help >> Avi >> >>