Hi Venkata.

300 requests per minute look like a 200ms per request, which should be a
normal response time to send a file if there isn't any speed limitation
(how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend
to limit the source parallelization, because it can consume pretty fast
from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up
using a custom S3Sink, but I did have some issues writing to S3 because the
request wasn't parallelised. Check this thread,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hello,
>
> I have posted the same in stackoverflow but didnt get any response. So
> posting it here for help.
>
>
> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>
> Details:
>
> I am working on a flink application on kubernetes(eks) which consumes data
> from kafka and write it to s3.
>
> We have around 120 million xml messages of size 4TB in kafka. Consuming
> from kafka is super fast.
>
> These are just string messages from kafka.
>
> There is a high back pressure while writing to s3. We are not even hitting
> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
> only 300 writes per minute to S3 which is very slow.
>
> I am using StreamFileSink to write to s3 with Rolling policy as
> OnCheckpointPolicy.
>
> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
>
> Other than this I dont have any config related to s3
>
>     StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink
>             .forRowFormat(new Path(s3://BUCKET),
>                     (Tuple3<String,String, String> element, OutputStream 
> stream) -> {
>                         PrintStream out = new PrintStream(stream);
>                         out.println(element.f2);
>                     })
>             // Determine component type for each record
>             .withBucketAssigner(new CustomBucketAssigner())
>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>             .build();
>
> Is there anything that we can optimize on s3 from streamfilesink or in
> flink-conf.xml ?
>
> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>
> For checkpointing too I am using s3:// instead of s3p or s3a
>
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend(s3://checkpoint_bucket, true));
>         env.enableCheckpointing(300000);
>
>
>

Reply via email to