Hi David, The avg size of each file is around 30KB and I have checkpoint interval of 5 minutes. Some files are even 1 kb, because of checkpoint some files are merged into 1 big file around 300MB.
With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it is taking weeks to write to s3. I have tried to increase parallelism of sink but I dont see any improvement. The sink record is Tuple3<String,String,String>, the actual content of file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* I guess the prefix determination in custombucketassigner wont be causing this delay? Could you please shed some light on writing custom s3 sink ? Thanks On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> wrote: > Hi Venkata. > > 300 requests per minute look like a 200ms per request, which should be a > normal response time to send a file if there isn't any speed limitation > (how big are the files?). > > Have you changed the parallelization to be higher than 1? I also recommend > to limit the source parallelization, because it can consume pretty fast > from Kafka and create some kind of backpressure. > > I don't any much experience with StreamingFileSink, because I've ended up > using a custom S3Sink, but I did have some issues writing to S3 because the > request wasn't parallelised. Check this thread, > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 > > On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < > vkollur...@gmail.com> wrote: > >> Hello, >> >> I have posted the same in stackoverflow but didnt get any response. So >> posting it here for help. >> >> >> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >> >> Details: >> >> I am working on a flink application on kubernetes(eks) which consumes >> data from kafka and write it to s3. >> >> We have around 120 million xml messages of size 4TB in kafka. Consuming >> from kafka is super fast. >> >> These are just string messages from kafka. >> >> There is a high back pressure while writing to s3. We are not even >> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >> seeing only 300 writes per minute to S3 which is very slow. >> >> I am using StreamFileSink to write to s3 with Rolling policy as >> OnCheckpointPolicy. >> >> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or >> s3p) >> >> Other than this I dont have any config related to s3 >> >> StreamingFileSink<Tuple3<String,String, String>> sink = StreamingFileSink >> .forRowFormat(new Path(s3://BUCKET), >> (Tuple3<String,String, String> element, OutputStream >> stream) -> { >> PrintStream out = new PrintStream(stream); >> out.println(element.f2); >> }) >> // Determine component type for each record >> .withBucketAssigner(new CustomBucketAssigner()) >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >> .build(); >> >> Is there anything that we can optimize on s3 from streamfilesink or in >> flink-conf.xml ? >> >> Like using bulkformat or any config params like fs.s3.maxThreads etc. >> >> For checkpointing too I am using s3:// instead of s3p or s3a >> >> env.setStateBackend((StateBackend) new >> RocksDBStateBackend(s3://checkpoint_bucket, true)); >> env.enableCheckpointing(300000); >> >> >>