Hi Venkata.

300 requests per minute look like a 200ms per request, which should be a
normal response time to send a file if there isn't any speed limitation
(how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend
to limit the source parallelization, because it can consume pretty fast
from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up
using a custom S3Sink, but I did have some issues writing to S3 because the
request wasn't parallelised. Check this thread,

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hello,
> I have posted the same in stackoverflow but didnt get any response. So
> posting it here for help.
> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
> Details:
> I am working on a flink application on kubernetes(eks) which consumes data
> from kafka and write it to s3.
> We have around 120 million xml messages of size 4TB in kafka. Consuming
> from kafka is super fast.
> These are just string messages from kafka.
> There is a high back pressure while writing to s3. We are not even hitting
> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
> only 300 writes per minute to S3 which is very slow.
> I am using StreamFileSink to write to s3 with Rolling policy as
> OnCheckpointPolicy.
> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
> Other than this I dont have any config related to s3
>     StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
>             .forRowFormat(new Path(s3://BUCKET),
>                     (Tuple3<String,String, String> element, OutputStream 
> stream) -> {
>                         PrintStream out = new PrintStream(stream);
>                         out.println(element.f2);
>                     })
>             // Determine component type for each record
>             .withBucketAssigner(new CustomBucketAssigner())
>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>             .build();
> Is there anything that we can optimize on s3 from streamfilesink or in
> flink-conf.xml ?
> Like using bulkformat or any config params like fs.s3.maxThreads etc.
> For checkpointing too I am using s3:// instead of s3p or s3a
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend(s3://checkpoint_bucket, true));
>         env.enableCheckpointing(300000);

Reply via email to