Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a
BucketingSink
file
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
? something like :

val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins


On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> And for a Java example which is actually similar to your pipeline,
> you can check the ParquetStreamingFileSinkITCase.
>
>
>
> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Avi,
>>
>> At a first glance I am not seeing anything wrong with your code.
>> Did you verify that there are elements flowing in your pipeline and that
>> checkpoints are actually completed?
>> And also can you check the logs at Job and Task Manager for anything
>> suspicious?
>>
>> Unfortunately, we do not allow specifying encoding and other parameters
>> to your writer, which is an omission
>> on our part and this should be fixed. Could you open a JIRA for that?
>>
>> If you want to know more about Flink's Parquet-Avro writer, feel free to
>> have a look at the ParquetAvroWriters
>> class.
>>
>> Cheers,
>> Kostas
>>
>>
>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>
>>> Thanks a lot Kostas, but the file not created . what am I doing wrong?
>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
>>>
>>> object Tester extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   def now = System.currentTimeMillis()
>>>   val path = new Path(s"test-$now.parquet")
>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>   .build()
>>>   env.enableCheckpointing(100)
>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>>     val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>     genericReocrd.put("name", r.name)
>>>     genericReocrd.put("code", r.code.asString)
>>>     genericReocrd.put("ts", r.ts)
>>>     genericReocrd
>>>   }
>>>     stream.addSink { r =>
>>>         println(s"In Sink $r") //getting this line
>>>         streamingSink
>>>     }
>>>   env.execute()
>>> }
>>>
>>> Cheers
>>> Avi
>>>
>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>>
>>>> Sorry, previously I got confused and I assumed you were using Flink's
>>>> StreamingFileSink.
>>>>
>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>
>>>> StreamingFileSink.forBulkFormat(
>>>>       Path...(MY_PATH),
>>>>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>>> .build()
>>>>
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Thanks.
>>>>> yes, the *env.execute* is called and enabled checkpoints
>>>>> I think the problem is where to place the *writer.close *to flush the
>>>>> cache
>>>>> If I'll place on the sink after the write event e.g
>>>>> addSink{
>>>>> writer.write
>>>>> writer.close
>>>>> }
>>>>> in this case only the first record will be included in the file but
>>>>> not the rest of the stream.
>>>>>
>>>>>
>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi again Avi,
>>>>>>
>>>>>> In the first example that you posted (the one with the Kafka source),
>>>>>> do you call env.execute()?
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> In the last snippet that you posted, you have not activated
>>>>>>> checkpoints.
>>>>>>>
>>>>>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>>>>>> especially in the case of BulkWriters (like Parquet) where
>>>>>>> the part file is rolled upon reception of a checkpoint and the part
>>>>>>> is finalised (i.e. "committed") when the checkpoint gets completed
>>>>>>> successfully.
>>>>>>>
>>>>>>> Could you please enable checkpointing and make sure that the job
>>>>>>> runs long enough for at least some checkpoints to be completed?
>>>>>>>
>>>>>>> Thanks a lot,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Checkout this little App. you can see that the file is created but
>>>>>>>> no data is written. even for a single record
>>>>>>>>
>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>>>>> import org.apache.avro.Schema
>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>> import org.apache.hadoop.fs.Path
>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>>>>> import scala.io.Source
>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>
>>>>>>>> object Tester extends App {
>>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>>   val schemaString = 
>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>   val config = ParquetWriterConfig()
>>>>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>>   genericReocrd.put("name", "test_b")
>>>>>>>>   genericReocrd.put("code", "NoError")
>>>>>>>>   genericReocrd.put("ts", 100L)
>>>>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>     .withSchema(schema)
>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>     .withValidation(config.validating)
>>>>>>>>     .build()
>>>>>>>>
>>>>>>>>   writer.write(genericReocrd)
>>>>>>>>   stream.addSink { r =>
>>>>>>>>     println(s"In Sink $r")
>>>>>>>>     writer.write(r)
>>>>>>>>   }
>>>>>>>>   env.execute()
>>>>>>>>   //  writer.close()
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Can you try closing the writer?
>>>>>>>>>
>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>>>>>>> snapshot()( since you are checkpointing hence this method will be 
>>>>>>>>> called)
>>>>>>>>>
>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Rafi,
>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will
>>>>>>>>>> try to add it as you suggested. however it seems that the messages I
>>>>>>>>>> repeating in the stream over and over even if I am pushing single 
>>>>>>>>>> message
>>>>>>>>>> manually to the queue, that message will repeat infinity
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>> Avi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>
>>>>>>>>>>> I can't see the part where you use
>>>>>>>>>>> assignTimestampsAndWatermarks.
>>>>>>>>>>> If this part in not set properly, it's possible that watermarks
>>>>>>>>>>> are not sent and nothing will be written to your Sink.
>>>>>>>>>>>
>>>>>>>>>>> See here for more details:
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>>>>
>>>>>>>>>>> Hope this helps,
>>>>>>>>>>> Rafi
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink 
>>>>>>>>>>>> however it
>>>>>>>>>>>> seems like the stream is repeating itself like endless loop and 
>>>>>>>>>>>> the parquet
>>>>>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>>>>>
>>>>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>>>>   private val schemaString =
>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>     .build()
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>>>>>>>     writer.write(value) //the output is not written to the
>>>>>>>>>>>> file
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> //main app
>>>>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>>>>
>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>>>>
>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>>>>
>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>>>>   *val stream2: DataStream[DnsRequest] =
>>>>>>>>>>>> env.addSource(//consume from kafka)*
>>>>>>>>>>>> *stream2.map { r =>*
>>>>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a
>>>>>>>>>>>> loop*
>>>>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>>>>> *    genericReocrd*
>>>>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your help
>>>>>>>>>>>> Avi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks,
>>>>>>>>> Vipul
>>>>>>>>>
>>>>>>>>

Reply via email to