Hi,

Generally speaking you can pass the batch size through RollingPolicy[1].
Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it
does not allow adjusting its behavior on part size. Maybe Kostas have an
idea how to do that in the least invasive way. How to do it for non bulk
formats you can have a look at[2].

I assume that you were using AvroParquetWriter. You can specify
compression on the ParquetWriter I guess the same way as before. The
code for doing it can look sth like this:

StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    new ParquetWriterFactory<>(out ->
        AvroParquetWriter.<Datum>builder(out)
            .withSchema(...)
            .withDataMode(...)
            .withCompressionCodec(...)
            .build()))
    .build()

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html

[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java

On 26/01/2019 08:18, Taher Koitawala wrote:
> Can someone please help with this?
>
> On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala
> <taher.koitaw...@gslab.com <mailto:taher.koitaw...@gslab.com> wrote:
>
>     Hi All,
>              Is there a way to specify /batch size/ and /compression
>     /properties when using StreamingFileSink just like we did in
>     bucketing sink? The only parameters it is accepting is Inactivity
>     bucket check interval and avro schema.
>
>               We have numerous flink jobs pulling data from the same
>     kafka topics, however doing different operations. And each flink
>     job is writing a file with different size and we would want to
>     make it consistent.
>
>
>     Regards,
>     Taher Koitawala
>     GS Lab Pune
>     +91 8407979163
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to