Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not
sent and nothing will be written to your Sink.

See here for more details:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Hope this helps,
Rafi

On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote:

> Hi,
>
> I am trying to implement Parquet Writer as SinkFunction. The pipeline
> consists of kafka as source and parquet file as a sink however it seems
> like the stream is repeating itself like endless loop and the parquet file
> is not written . can someone please help me with this?
>
> object ParquetSinkWriter{
>   private val path = new Path("tmp/pfile")
>   private val schemaString =
> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>   private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
>   private val compressionCodecName = CompressionCodecName.SNAPPY
>   private   val config = ParquetWriterConfig()
>   val writer: ParquetWriter[GenericRecord] =
> AvroParquetWriter.builder[GenericRecord](path)
>     .withSchema(avroSchema)
>     .withCompressionCodec(compressionCodecName)
>     .withPageSize(config.pageSize)
>     .withRowGroupSize(config.blockSize)
>     .withDictionaryEncoding(config.enableDictionary)
>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>     .withValidation(config.validating)
>     .build()
> }
>
> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
> SinkFunction[GenericRecord] {
>   import ParquetSinkWriter._
>   override def invoke(value: GenericRecord): Unit = {
>     println(s"ADDING TO File : $value") // getting this output
>     writer.write(value) //the output is not written to the file
>   }
> }
>
> //main app
> object StreamingJob extends App  {
>  implicit val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.enableCheckpointing(500)
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>   env.getCheckpointConfig.setCheckpointTimeout(600)
>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
> Time.seconds(3), Time.seconds(3)))
>   val backend: StateBackend = new
> RocksDBStateBackend("file:///tmp/rocksdb", true)
>   env.setStateBackend(backend)
>   val writer = new ParquetSinkWriter(outputPath, schema)
>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
> kafka)*
> *stream2.map { r =>*
> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
> *    val genericReocrd: GenericRecord = new GenericData.Record(schema)*
> *    genericReocrd.put("qname", r.qname)*
> *    genericReocrd.put("rcode", r.rcode)*
> *    genericReocrd.put("ts", r.ts)*
> *    genericReocrd*
> *  }.addSink(writer) *
>
> Thanks for your help
> Avi
>
>

Reply via email to