Thanks Kostas. I will definitely look into that. but is the
StreamingFileSink also support setting the batch size by size and/or by
time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Avi,
>
> The ParquetAvroWriters cannot be used with the BucketingSink.
>
> In fact the StreamingFIleSink is the "evolution" of the BucketingSink and
> it supports
> all the functionality that the BucketingSink supports.
>
> Given this, why not using the StreamingFileSink?
>
> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Thanks looks good.
>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a 
>> BucketingSink
>> file
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
>> ? something like :
>>
>> val bucketingSink = new BucketingSink[String]("/base/path")
>> bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
>>
>>
>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> And for a Java example which is actually similar to your pipeline,
>>> you can check the ParquetStreamingFileSinkITCase.
>>>
>>>
>>>
>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> At a first glance I am not seeing anything wrong with your code.
>>>> Did you verify that there are elements flowing in your pipeline and
>>>> that checkpoints are actually completed?
>>>> And also can you check the logs at Job and Task Manager for anything
>>>> suspicious?
>>>>
>>>> Unfortunately, we do not allow specifying encoding and other parameters
>>>> to your writer, which is an omission
>>>> on our part and this should be fixed. Could you open a JIRA for that?
>>>>
>>>> If you want to know more about Flink's Parquet-Avro writer, feel free
>>>> to have a look at the ParquetAvroWriters
>>>> class.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>>
>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Thanks a lot Kostas, but the file not created . what am I doing wrong?
>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
>>>>>
>>>>> object Tester extends App {
>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>   def now = System.currentTimeMillis()
>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>>>   .build()
>>>>>   env.enableCheckpointing(100)
>>>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>>>>     val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>     genericReocrd.put("name", r.name)
>>>>>     genericReocrd.put("code", r.code.asString)
>>>>>     genericReocrd.put("ts", r.ts)
>>>>>     genericReocrd
>>>>>   }
>>>>>     stream.addSink { r =>
>>>>>         println(s"In Sink $r") //getting this line
>>>>>         streamingSink
>>>>>     }
>>>>>   env.execute()
>>>>> }
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>>
>>>>>> Sorry, previously I got confused and I assumed you were using Flink's
>>>>>> StreamingFileSink.
>>>>>>
>>>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>>>
>>>>>> StreamingFileSink.forBulkFormat(
>>>>>>       Path...(MY_PATH),
>>>>>>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>>>>> .build()
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks.
>>>>>>> yes, the *env.execute* is called and enabled checkpoints
>>>>>>> I think the problem is where to place the *writer.close *to flush
>>>>>>> the cache
>>>>>>> If I'll place on the sink after the write event e.g
>>>>>>> addSink{
>>>>>>> writer.write
>>>>>>> writer.close
>>>>>>> }
>>>>>>> in this case only the first record will be included in the file but
>>>>>>> not the rest of the stream.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Hi again Avi,
>>>>>>>>
>>>>>>>> In the first example that you posted (the one with the Kafka
>>>>>>>> source), do you call env.execute()?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Avi,
>>>>>>>>>
>>>>>>>>> In the last snippet that you posted, you have not activated
>>>>>>>>> checkpoints.
>>>>>>>>>
>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce
>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where
>>>>>>>>> the part file is rolled upon reception of a checkpoint and the
>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets 
>>>>>>>>> completed
>>>>>>>>> successfully.
>>>>>>>>>
>>>>>>>>> Could you please enable checkpointing and make sure that the job
>>>>>>>>> runs long enough for at least some checkpoints to be completed?
>>>>>>>>>
>>>>>>>>> Thanks a lot,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Checkout this little App. you can see that the file is created
>>>>>>>>>> but no data is written. even for a single record
>>>>>>>>>>
>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>>>>>>> import org.apache.avro.Schema
>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>>>>>>> import 
>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>>>> import org.apache.hadoop.fs.Path
>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>>>>>>> import scala.io.Source
>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>
>>>>>>>>>> object Tester extends App {
>>>>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>>>>   val schemaString = 
>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>>   val config = ParquetWriterConfig()
>>>>>>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>>>>   genericReocrd.put("name", "test_b")
>>>>>>>>>>   genericReocrd.put("code", "NoError")
>>>>>>>>>>   genericReocrd.put("ts", 100L)
>>>>>>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>     .withSchema(schema)
>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>     .build()
>>>>>>>>>>
>>>>>>>>>>   writer.write(genericReocrd)
>>>>>>>>>>   stream.addSink { r =>
>>>>>>>>>>     println(s"In Sink $r")
>>>>>>>>>>     writer.write(r)
>>>>>>>>>>   }
>>>>>>>>>>   env.execute()
>>>>>>>>>>   //  writer.close()
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you try closing the writer?
>>>>>>>>>>>
>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close()
>>>>>>>>>>> in snapshot()( since you are checkpointing hence this method will 
>>>>>>>>>>> be called)
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <
>>>>>>>>>>> avi.l...@bluevoyant.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Rafi,
>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will
>>>>>>>>>>>> try to add it as you suggested. however it seems that the messages 
>>>>>>>>>>>> I
>>>>>>>>>>>> repeating in the stream over and over even if I am pushing single 
>>>>>>>>>>>> message
>>>>>>>>>>>> manually to the queue, that message will repeat infinity
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>> Avi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <
>>>>>>>>>>>> rafi.ar...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I can't see the part where you use
>>>>>>>>>>>>> assignTimestampsAndWatermarks.
>>>>>>>>>>>>> If this part in not set properly, it's possible that
>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your Sink.
>>>>>>>>>>>>>
>>>>>>>>>>>>> See here for more details:
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helps,
>>>>>>>>>>>>> Rafi
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink 
>>>>>>>>>>>>>> however it
>>>>>>>>>>>>>> seems like the stream is repeating itself like endless loop and 
>>>>>>>>>>>>>> the parquet
>>>>>>>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>>>>>>   private val schemaString =
>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>>>>>>   private val compressionCodecName =
>>>>>>>>>>>>>> CompressionCodecName.SNAPPY
>>>>>>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema)
>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] {
>>>>>>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>>>>>>     println(s"ADDING TO File : $value") // getting this
>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>     writer.write(value) //the output is not written to the
>>>>>>>>>>>>>> file
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> //main app
>>>>>>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>>>>>>   *val stream2: DataStream[DnsRequest] =
>>>>>>>>>>>>>> env.addSource(//consume from kafka)*
>>>>>>>>>>>>>> *stream2.map { r =>*
>>>>>>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in
>>>>>>>>>>>>>> a loop*
>>>>>>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>>>>>>> *    genericReocrd*
>>>>>>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your help
>>>>>>>>>>>>>> Avi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Vipul
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to