Hi Venkata, are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11.
In any case, I'd look at the parameter fs.s3a.connection.maximum, as you store both state and data on S3. I'd probably go with slot*3 or even higher. Lastly, the way you output elements looks also a bit suspicious. PrintStream is not known for great performance. I'm also surprised that it works without manual flushing. [1] https://issues.apache.org/jira/browse/FLINK-11499 On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote: > I think S3 is a wrong storage backend for this volumes of small messages. > Try to use a NoSQL database or write multiple messages into one file in S3 > (10000 or 100000) > > If you still want to go with your scenario then try a network optimized > instance and use s3a in Flink and configure s3 entropy. > > Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru < > vkollur...@gmail.com>: > > > Hi David, > > The avg size of each file is around 30KB and I have checkpoint interval of > 5 minutes. Some files are even 1 kb, because of checkpoint some files are > merged into 1 big file around 300MB. > > With 120 million files and 4Tb, if the rate of transfer is 300 per minute, > it is taking weeks to write to s3. > > I have tried to increase parallelism of sink but I dont see any > improvement. > > The sink record is Tuple3<String,String,String>, the actual content of > file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* > > I guess the prefix determination in custombucketassigner wont be causing > this delay? > > Could you please shed some light on writing custom s3 sink ? > > Thanks > > > On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> > wrote: > >> Hi Venkata. >> >> 300 requests per minute look like a 200ms per request, which should be a >> normal response time to send a file if there isn't any speed limitation >> (how big are the files?). >> >> Have you changed the parallelization to be higher than 1? I also >> recommend to limit the source parallelization, because it can consume >> pretty fast from Kafka and create some kind of backpressure. >> >> I don't any much experience with StreamingFileSink, because I've ended up >> using a custom S3Sink, but I did have some issues writing to S3 because the >> request wasn't parallelised. Check this thread, >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >> >> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < >> vkollur...@gmail.com> wrote: >> >>> Hello, >>> >>> I have posted the same in stackoverflow but didnt get any response. So >>> posting it here for help. >>> >>> >>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>> >>> Details: >>> >>> I am working on a flink application on kubernetes(eks) which consumes >>> data from kafka and write it to s3. >>> >>> We have around 120 million xml messages of size 4TB in kafka. Consuming >>> from kafka is super fast. >>> >>> These are just string messages from kafka. >>> >>> There is a high back pressure while writing to s3. We are not even >>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >>> seeing only 300 writes per minute to S3 which is very slow. >>> >>> I am using StreamFileSink to write to s3 with Rolling policy as >>> OnCheckpointPolicy. >>> >>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or >>> s3p) >>> >>> Other than this I dont have any config related to s3 >>> >>> StreamingFileSink<Tuple3<String,String, String>> sink = >>> StreamingFileSink >>> .forRowFormat(new Path(s3://BUCKET), >>> (Tuple3<String,String, String> element, OutputStream >>> stream) -> { >>> PrintStream out = new PrintStream(stream); >>> out.println(element.f2); >>> }) >>> // Determine component type for each record >>> .withBucketAssigner(new CustomBucketAssigner()) >>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>> .build(); >>> >>> Is there anything that we can optimize on s3 from streamfilesink or in >>> flink-conf.xml ? >>> >>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>> >>> For checkpointing too I am using s3:// instead of s3p or s3a >>> >>> env.setStateBackend((StateBackend) new >>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>> env.enableCheckpointing(300000); >>> >>> >>> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng