Hi,
I am trying to implement Parquet Writer as SinkFunction. The pipeline
consists of kafka as source and parquet file as a sink however it seems
like the stream is repeating itself like endless loop and the parquet file
is not written . can someone please help me with this?
object ParquetSinkWriter{
private val path = new Path("tmp/pfile")
private val schemaString =
Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
private val compressionCodecName = CompressionCodecName.SNAPPY
private val config = ParquetWriterConfig()
val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
.withSchema(avroSchema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()
}
class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
SinkFunction[GenericRecord] {
import ParquetSinkWriter._
override def invoke(value: GenericRecord): Unit = {
println(s"ADDING TO File : $value") // getting this output
writer.write(value) //the output is not written to the file
}
}
//main app
object StreamingJob extends App {
implicit val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(600)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
Time.seconds(3), Time.seconds(3)))
val backend: StateBackend = new
RocksDBStateBackend("file:///tmp/rocksdb", true)
env.setStateBackend(backend)
val writer = new ParquetSinkWriter(outputPath, schema)
*val stream2: DataStream[DnsRequest] = env.addSource(//consume from
kafka)*
*stream2.map { r =>*
* println(s"MAPPING $r") //this output keeps repeating in a loop*
* val genericReocrd: GenericRecord = new GenericData.Record(schema)*
* genericReocrd.put("qname", r.qname)*
* genericReocrd.put("rcode", r.rcode)*
* genericReocrd.put("ts", r.ts)*
* genericReocrd*
* }.addSink(writer) *
Thanks for your help
Avi