Hi Avi,

If Parquet is not a requirement then you can use the StreamingFileSink and
write as plain text, if this is ok for you.
In this case, you can set the batch size and specify a custom RollingPolicy
in general.

For example I would recommend to check [1] where you have, of course, to
adjust the Encoder and the RollingPolicy.

https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java

Cheers,
Kostas

On Mon, Dec 3, 2018 at 3:50 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Thanks Kostas,
> Ok got it, so bucketingSink might not be a good choice here. can you
> please advice what will be the best approach ? I have heavy load of data
> that I consume from kafka that I want to process and put them in a file
> (doesn't have to be parquet) . I thought that StreamingFileSink might be a
> good choice but I guess I am doing something wrong there . if there is a
> good example for that - it will be great .
>
> BR
> Avi
>
> On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
>
>> Hi Avi,
>>
>> For Bulk Formats like Parquet, unfortunately, we do not support setting
>> the batch size.
>> The part-files roll on every checkpoint. This is a known limitation and
>> there are plans to
>> alleviate it in the future.
>>
>> Setting the batch size (among other things) is supported for RowWise
>> formats.
>>
>> Cheers,
>> Kostas
>>
>> On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>
>>> Thanks Kostas. I will definitely look into that. but is the
>>> StreamingFileSink also support setting the batch size by size and/or by
>>> time interval like bucketing sink ?
>>>
>>> On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> The ParquetAvroWriters cannot be used with the BucketingSink.
>>>>
>>>> In fact the StreamingFIleSink is the "evolution" of the BucketingSink
>>>> and it supports
>>>> all the functionality that the BucketingSink supports.
>>>>
>>>> Given this, why not using the StreamingFileSink?
>>>>
>>>> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Thanks looks good.
>>>>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a 
>>>>> BucketingSink
>>>>> file
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
>>>>> ? something like :
>>>>>
>>>>> val bucketingSink = new BucketingSink[String]("/base/path")
>>>>> bucketingSink.setBucketer(new 
>>>>> DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
>>>>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
>>>>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
>>>>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
>>>>>
>>>>>
>>>>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> And for a Java example which is actually similar to your pipeline,
>>>>>> you can check the ParquetStreamingFileSinkITCase.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> At a first glance I am not seeing anything wrong with your code.
>>>>>>> Did you verify that there are elements flowing in your pipeline and
>>>>>>> that checkpoints are actually completed?
>>>>>>> And also can you check the logs at Job and Task Manager for anything
>>>>>>> suspicious?
>>>>>>>
>>>>>>> Unfortunately, we do not allow specifying encoding and other
>>>>>>> parameters to your writer, which is an omission
>>>>>>> on our part and this should be fixed. Could you open a JIRA for that?
>>>>>>>
>>>>>>> If you want to know more about Flink's Parquet-Avro writer, feel
>>>>>>> free to have a look at the ParquetAvroWriters
>>>>>>> class.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks a lot Kostas, but the file not created . what am I doing
>>>>>>>> wrong?
>>>>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet
>>>>>>>> writer?
>>>>>>>>
>>>>>>>> object Tester extends App {
>>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>>>>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>>>>>>   .build()
>>>>>>>>   env.enableCheckpointing(100)
>>>>>>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>>>>>>>     val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>>     genericReocrd.put("name", r.name)
>>>>>>>>     genericReocrd.put("code", r.code.asString)
>>>>>>>>     genericReocrd.put("ts", r.ts)
>>>>>>>>     genericReocrd
>>>>>>>>   }
>>>>>>>>     stream.addSink { r =>
>>>>>>>>         println(s"In Sink $r") //getting this line
>>>>>>>>         streamingSink
>>>>>>>>     }
>>>>>>>>   env.execute()
>>>>>>>> }
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Avi
>>>>>>>>
>>>>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Sorry, previously I got confused and I assumed you were using
>>>>>>>>> Flink's StreamingFileSink.
>>>>>>>>>
>>>>>>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>>>>>>
>>>>>>>>> StreamingFileSink.forBulkFormat(
>>>>>>>>>       Path...(MY_PATH),
>>>>>>>>>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>>>>>>>> .build()
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>> yes, the *env.execute* is called and enabled checkpoints
>>>>>>>>>> I think the problem is where to place the *writer.close *to
>>>>>>>>>> flush the cache
>>>>>>>>>> If I'll place on the sink after the write event e.g
>>>>>>>>>> addSink{
>>>>>>>>>> writer.write
>>>>>>>>>> writer.close
>>>>>>>>>> }
>>>>>>>>>> in this case only the first record will be included in the file
>>>>>>>>>> but not the rest of the stream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi again Avi,
>>>>>>>>>>>
>>>>>>>>>>> In the first example that you posted (the one with the Kafka
>>>>>>>>>>> source), do you call env.execute()?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>>>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>>
>>>>>>>>>>>> In the last snippet that you posted, you have not activated
>>>>>>>>>>>> checkpoints.
>>>>>>>>>>>>
>>>>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce
>>>>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where
>>>>>>>>>>>> the part file is rolled upon reception of a checkpoint and the
>>>>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets 
>>>>>>>>>>>> completed
>>>>>>>>>>>> successfully.
>>>>>>>>>>>>
>>>>>>>>>>>> Could you please enable checkpointing and make sure that the
>>>>>>>>>>>> job runs long enough for at least some checkpoints to be completed?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot,
>>>>>>>>>>>> Kostas
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <
>>>>>>>>>>>> avi.l...@bluevoyant.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Checkout this little App. you can see that the file is created
>>>>>>>>>>>>> but no data is written. even for a single record
>>>>>>>>>>>>>
>>>>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>>>>>>>>>> import org.apache.avro.Schema
>>>>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>>>>>>>>>> import 
>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>>>>>>> import org.apache.hadoop.fs.Path
>>>>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, 
>>>>>>>>>>>>> ParquetWriter }
>>>>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>>>>>>>>>> import scala.io.Source
>>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>>>>
>>>>>>>>>>>>> object Tester extends App {
>>>>>>>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>>>>>>>   val schemaString = 
>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>>>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>>>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>>>>>   val config = ParquetWriterConfig()
>>>>>>>>>>>>>   val genericReocrd: GenericRecord = new 
>>>>>>>>>>>>> GenericData.Record(schema)
>>>>>>>>>>>>>   genericReocrd.put("name", "test_b")
>>>>>>>>>>>>>   genericReocrd.put("code", "NoError")
>>>>>>>>>>>>>   genericReocrd.put("ts", 100L)
>>>>>>>>>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>>     .withSchema(schema)
>>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>
>>>>>>>>>>>>>   writer.write(genericReocrd)
>>>>>>>>>>>>>   stream.addSink { r =>
>>>>>>>>>>>>>     println(s"In Sink $r")
>>>>>>>>>>>>>     writer.write(r)
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>   env.execute()
>>>>>>>>>>>>>   //  writer.close()
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <
>>>>>>>>>>>>> neoea...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you try closing the writer?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a
>>>>>>>>>>>>>> .close() in snapshot()( since you are checkpointing hence this 
>>>>>>>>>>>>>> method will
>>>>>>>>>>>>>> be called)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <
>>>>>>>>>>>>>> avi.l...@bluevoyant.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Rafi,
>>>>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I
>>>>>>>>>>>>>>> will try to add it as you suggested. however it seems that the 
>>>>>>>>>>>>>>> messages I
>>>>>>>>>>>>>>> repeating in the stream over and over even if I am pushing 
>>>>>>>>>>>>>>> single message
>>>>>>>>>>>>>>> manually to the queue, that message will repeat infinity
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>> Avi
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <
>>>>>>>>>>>>>>> rafi.ar...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I can't see the part where you use
>>>>>>>>>>>>>>>> assignTimestampsAndWatermarks.
>>>>>>>>>>>>>>>> If this part in not set properly, it's possible that
>>>>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your 
>>>>>>>>>>>>>>>> Sink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> See here for more details:
>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hope this helps,
>>>>>>>>>>>>>>>> Rafi
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <
>>>>>>>>>>>>>>>> avi.l...@bluevoyant.com wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction.
>>>>>>>>>>>>>>>>> The pipeline consists of kafka as source and parquet file as 
>>>>>>>>>>>>>>>>> a sink however
>>>>>>>>>>>>>>>>> it seems like the stream is repeating itself like endless 
>>>>>>>>>>>>>>>>> loop and the
>>>>>>>>>>>>>>>>> parquet file is not written . can someone please help me with 
>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>>>>>>>>>   private val schemaString =
>>>>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>>>>>>>>>   private val compressionCodecName =
>>>>>>>>>>>>>>>>> CompressionCodecName.SNAPPY
>>>>>>>>>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema)
>>>>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] {
>>>>>>>>>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>>>>>>>>>     println(s"ADDING TO File : $value") // getting this
>>>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>>>>     writer.write(value) //the output is not written to
>>>>>>>>>>>>>>>>> the file
>>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> //main app
>>>>>>>>>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>>>>>>>>>   *val stream2: DataStream[DnsRequest] =
>>>>>>>>>>>>>>>>> env.addSource(//consume from kafka)*
>>>>>>>>>>>>>>>>> *stream2.map { r =>*
>>>>>>>>>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating
>>>>>>>>>>>>>>>>> in a loop*
>>>>>>>>>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>>>>>>>>>> *    genericReocrd*
>>>>>>>>>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your help
>>>>>>>>>>>>>>>>> Avi
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Vipul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to