Hi Venkata. 300 requests per minute look like a 200ms per request, which should be a normal response time to send a file if there isn't any speed limitation (how big are the files?).
Have you changed the parallelization to be higher than 1? I also recommend to limit the source parallelization, because it can consume pretty fast from Kafka and create some kind of backpressure. I don't any much experience with StreamingFileSink, because I've ended up using a custom S3Sink, but I did have some issues writing to S3 because the request wasn't parallelised. Check this thread, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < vkollur...@gmail.com> wrote: > Hello, > > I have posted the same in stackoverflow but didnt get any response. So > posting it here for help. > > > https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 > > Details: > > I am working on a flink application on kubernetes(eks) which consumes data > from kafka and write it to s3. > > We have around 120 million xml messages of size 4TB in kafka. Consuming > from kafka is super fast. > > These are just string messages from kafka. > > There is a high back pressure while writing to s3. We are not even hitting > the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing > only 300 writes per minute to S3 which is very slow. > > I am using StreamFileSink to write to s3 with Rolling policy as > OnCheckpointPolicy. > > Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p) > > Other than this I dont have any config related to s3 > > StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink > .forRowFormat(new Path(s3://BUCKET), > (Tuple3<String,String, String> element, OutputStream > stream) -> { > PrintStream out = new PrintStream(stream); > out.println(element.f2); > }) > // Determine component type for each record > .withBucketAssigner(new CustomBucketAssigner()) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) > .build(); > > Is there anything that we can optimize on s3 from streamfilesink or in > flink-conf.xml ? > > Like using bulkformat or any config params like fs.s3.maxThreads etc. > > For checkpointing too I am using s3:// instead of s3p or s3a > > env.setStateBackend((StateBackend) new > RocksDBStateBackend(s3://checkpoint_bucket, true)); > env.enableCheckpointing(300000); > > >