Checkout this little App. you can see that the file is created but no data
is written. even for a single record
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString =
Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()
writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}
On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[email protected]> wrote:
> Can you try closing the writer?
>
> AvroParquetWriter has an internal buffer. Try doing a .close() in
> snapshot()( since you are checkpointing hence this method will be called)
>
> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[email protected]> wrote:
>
>> Thanks Rafi,
>> I am actually not using assignTimestampsAndWatermarks , I will try to
>> add it as you suggested. however it seems that the messages I repeating in
>> the stream over and over even if I am pushing single message manually to
>> the queue, that message will repeat infinity
>>
>> Cheers
>> Avi
>>
>>
>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[email protected]> wrote:
>>
>>> Hi Avi,
>>>
>>> I can't see the part where you use assignTimestampsAndWatermarks.
>>> If this part in not set properly, it's possible that watermarks are not
>>> sent and nothing will be written to your Sink.
>>>
>>> See here for more details:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>
>>> Hope this helps,
>>> Rafi
>>>
>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <[email protected] wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>>> consists of kafka as source and parquet file as a sink however it seems
>>>> like the stream is repeating itself like endless loop and the parquet file
>>>> is not written . can someone please help me with this?
>>>>
>>>> object ParquetSinkWriter{
>>>> private val path = new Path("tmp/pfile")
>>>> private val schemaString =
>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>> private val avroSchema: Schema = new
>>>> Schema.Parser().parse(schemaString)
>>>> private val compressionCodecName = CompressionCodecName.SNAPPY
>>>> private val config = ParquetWriterConfig()
>>>> val writer: ParquetWriter[GenericRecord] =
>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>> .withSchema(avroSchema)
>>>> .withCompressionCodec(compressionCodecName)
>>>> .withPageSize(config.pageSize)
>>>> .withRowGroupSize(config.blockSize)
>>>> .withDictionaryEncoding(config.enableDictionary)
>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>> .withValidation(config.validating)
>>>> .build()
>>>> }
>>>>
>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>> SinkFunction[GenericRecord] {
>>>> import ParquetSinkWriter._
>>>> override def invoke(value: GenericRecord): Unit = {
>>>> println(s"ADDING TO File : $value") // getting this output
>>>> writer.write(value) //the output is not written to the file
>>>> }
>>>> }
>>>>
>>>> //main app
>>>> object StreamingJob extends App {
>>>> implicit val env: StreamExecutionEnvironment =
>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>> env.enableCheckpointing(500)
>>>>
>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>
>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>> env.getCheckpointConfig.setCheckpointTimeout(600)
>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>> Time.seconds(3), Time.seconds(3)))
>>>> val backend: StateBackend = new
>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>> env.setStateBackend(backend)
>>>> val writer = new ParquetSinkWriter(outputPath, schema)
>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>>>> kafka)*
>>>> *stream2.map { r =>*
>>>> * println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>> * val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>>>> * genericReocrd.put("qname", r.qname)*
>>>> * genericReocrd.put("rcode", r.rcode)*
>>>> * genericReocrd.put("ts", r.ts)*
>>>> * genericReocrd*
>>>> * }.addSink(writer) *
>>>>
>>>> Thanks for your help
>>>> Avi
>>>>
>>>>
>
> --
> Thanks,
> Vipul
>