I think S3 is a wrong storage backend for this volumes of small messages. Try to use a NoSQL database or write multiple messages into one file in S3 (10000 or 100000)
If you still want to go with your scenario then try a network optimized instance and use s3a in Flink and configure s3 entropy. > Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru > <vkollur...@gmail.com>: > > > Hi David, > > The avg size of each file is around 30KB and I have checkpoint interval of 5 > minutes. Some files are even 1 kb, because of checkpoint some files are > merged into 1 big file around 300MB. > > With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it > is taking weeks to write to s3. > > I have tried to increase parallelism of sink but I dont see any improvement. > > The sink record is Tuple3<String,String,String>, the actual content of file > is f2. This is content is written to <s3 bucket>/f0/f1/part*-* > > I guess the prefix determination in custombucketassigner wont be causing this > delay? > > Could you please shed some light on writing custom s3 sink ? > > Thanks > > >> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> wrote: >> Hi Venkata. >> >> 300 requests per minute look like a 200ms per request, which should be a >> normal response time to send a file if there isn't any speed limitation (how >> big are the files?). >> >> Have you changed the parallelization to be higher than 1? I also recommend >> to limit the source parallelization, because it can consume pretty fast from >> Kafka and create some kind of backpressure. >> >> I don't any much experience with StreamingFileSink, because I've ended up >> using a custom S3Sink, but I did have some issues writing to S3 because the >> request wasn't parallelised. Check this thread, >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >> >>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru >>> <vkollur...@gmail.com> wrote: >>> Hello, >>> >>> I have posted the same in stackoverflow but didnt get any response. So >>> posting it here for help. >>> >>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>> >>> Details: >>> >>> I am working on a flink application on kubernetes(eks) which consumes data >>> from kafka and write it to s3. >>> >>> We have around 120 million xml messages of size 4TB in kafka. Consuming >>> from kafka is super fast. >>> >>> These are just string messages from kafka. >>> >>> There is a high back pressure while writing to s3. We are not even hitting >>> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing >>> only 300 writes per minute to S3 which is very slow. >>> >>> I am using StreamFileSink to write to s3 with Rolling policy as >>> OnCheckpointPolicy. >>> >>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p) >>> >>> Other than this I dont have any config related to s3 >>> >>> StreamingFileSink<Tuple3<String,String, String>> sink = >>> StreamingFileSink >>> .forRowFormat(new Path(s3://BUCKET), >>> (Tuple3<String,String, String> element, OutputStream >>> stream) -> { >>> PrintStream out = new PrintStream(stream); >>> out.println(element.f2); >>> }) >>> // Determine component type for each record >>> .withBucketAssigner(new CustomBucketAssigner()) >>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>> .build(); >>> Is there anything that we can optimize on s3 from streamfilesink or in >>> flink-conf.xml ? >>> >>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>> >>> For checkpointing too I am using s3:// instead of s3p or s3a >>> >>> env.setStateBackend((StateBackend) new >>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>> env.enableCheckpointing(300000); >>>