And for a Java example which is actually similar to your pipeline,
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Avi,
>
> At a first glance I am not seeing anything wrong with your code.
> Did you verify that there are elements flowing in your pipeline and that
> checkpoints are actually completed?
> And also can you check the logs at Job and Task Manager for anything
> suspicious?
>
> Unfortunately, we do not allow specifying encoding and other parameters to
> your writer, which is an omission
> on our part and this should be fixed. Could you open a JIRA for that?
>
> If you want to know more about Flink's Parquet-Avro writer, feel free to
> have a look at the ParquetAvroWriters
> class.
>
> Cheers,
> Kostas
>
>
> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Thanks a lot Kostas, but the file not created . what am I doing wrong?
>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
>>
>> object Tester extends App {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>   def now = System.currentTimeMillis()
>>   val path = new Path(s"test-$now.parquet")
>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>   ParquetAvroWriters.forGenericRecord(schema))
>>   .build()
>>   env.enableCheckpointing(100)
>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>     val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>     genericReocrd.put("name", r.name)
>>     genericReocrd.put("code", r.code.asString)
>>     genericReocrd.put("ts", r.ts)
>>     genericReocrd
>>   }
>>     stream.addSink { r =>
>>         println(s"In Sink $r") //getting this line
>>         streamingSink
>>     }
>>   env.execute()
>> }
>>
>> Cheers
>> Avi
>>
>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>>
>>> Sorry, previously I got confused and I assumed you were using Flink's
>>> StreamingFileSink.
>>>
>>> Could you try to use Flink's Avro - Parquet writer?
>>>
>>> StreamingFileSink.forBulkFormat(
>>>       Path...(MY_PATH),
>>>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>> .build()
>>>
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com>
>>> wrote:
>>>
>>>> Thanks.
>>>> yes, the *env.execute* is called and enabled checkpoints
>>>> I think the problem is where to place the *writer.close *to flush the
>>>> cache
>>>> If I'll place on the sink after the write event e.g
>>>> addSink{
>>>> writer.write
>>>> writer.close
>>>> }
>>>> in this case only the first record will be included in the file but not
>>>> the rest of the stream.
>>>>
>>>>
>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>> k.klou...@data-artisans.com> wrote:
>>>>
>>>>> Hi again Avi,
>>>>>
>>>>> In the first example that you posted (the one with the Kafka source),
>>>>> do you call env.execute()?
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> In the last snippet that you posted, you have not activated
>>>>>> checkpoints.
>>>>>>
>>>>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>>>>> especially in the case of BulkWriters (like Parquet) where
>>>>>> the part file is rolled upon reception of a checkpoint and the part
>>>>>> is finalised (i.e. "committed") when the checkpoint gets completed
>>>>>> successfully.
>>>>>>
>>>>>> Could you please enable checkpointing and make sure that the job runs
>>>>>> long enough for at least some checkpoints to be completed?
>>>>>>
>>>>>> Thanks a lot,
>>>>>> Kostas
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Checkout this little App. you can see that the file is created but
>>>>>>> no data is written. even for a single record
>>>>>>>
>>>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>>>> import org.apache.avro.Schema
>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>> import org.apache.hadoop.fs.Path
>>>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>>>> import scala.io.Source
>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>
>>>>>>> object Tester extends App {
>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>   val schemaString = 
>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>   val config = ParquetWriterConfig()
>>>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>   genericReocrd.put("name", "test_b")
>>>>>>>   genericReocrd.put("code", "NoError")
>>>>>>>   genericReocrd.put("ts", 100L)
>>>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>     .withSchema(schema)
>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>     .withValidation(config.validating)
>>>>>>>     .build()
>>>>>>>
>>>>>>>   writer.write(genericReocrd)
>>>>>>>   stream.addSink { r =>
>>>>>>>     println(s"In Sink $r")
>>>>>>>     writer.write(r)
>>>>>>>   }
>>>>>>>   env.execute()
>>>>>>>   //  writer.close()
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you try closing the writer?
>>>>>>>>
>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>>>>>> snapshot()( since you are checkpointing hence this method will be 
>>>>>>>> called)
>>>>>>>>
>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Rafi,
>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will
>>>>>>>>> try to add it as you suggested. however it seems that the messages I
>>>>>>>>> repeating in the stream over and over even if I am pushing single 
>>>>>>>>> message
>>>>>>>>> manually to the queue, that message will repeat infinity
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>> Avi
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Avi,
>>>>>>>>>>
>>>>>>>>>> I can't see the part where you use
>>>>>>>>>> assignTimestampsAndWatermarks.
>>>>>>>>>> If this part in not set properly, it's possible that watermarks
>>>>>>>>>> are not sent and nothing will be written to your Sink.
>>>>>>>>>>
>>>>>>>>>> See here for more details:
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>>>
>>>>>>>>>> Hope this helps,
>>>>>>>>>> Rafi
>>>>>>>>>>
>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink 
>>>>>>>>>>> however it
>>>>>>>>>>> seems like the stream is repeating itself like endless loop and the 
>>>>>>>>>>> parquet
>>>>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>>>>
>>>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>>>   private val schemaString =
>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>     .build()
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>>>>>>     writer.write(value) //the output is not written to the file
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> //main app
>>>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>>>
>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>>>
>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>>>   *val stream2: DataStream[DnsRequest] =
>>>>>>>>>>> env.addSource(//consume from kafka)*
>>>>>>>>>>> *stream2.map { r =>*
>>>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a
>>>>>>>>>>> loop*
>>>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>>>> *    genericReocrd*
>>>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your help
>>>>>>>>>>> Avi
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks,
>>>>>>>> Vipul
>>>>>>>>
>>>>>>>

Reply via email to