Hi all, @Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description. In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in Flink [1] for a bit more efficient implementation.
Cheers, Kostas [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Venkata, > > are the many small files intended or is it rather an issue of our commit > on checkpointing? If so then FLINK-11499 [1] should help you. Design is > close to done, unfortunately implementation will not make it into 1.11. > > In any case, I'd look at the parameter fs.s3a.connection.maximum, as you > store both state and data on S3. I'd probably go with slot*3 or even higher. > > Lastly, the way you output elements looks also a bit suspicious. > PrintStream is not known for great performance. I'm also surprised that it > works without manual flushing. > > [1] https://issues.apache.org/jira/browse/FLINK-11499 > > On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote: > >> I think S3 is a wrong storage backend for this volumes of small messages. >> Try to use a NoSQL database or write multiple messages into one file in >> S3 (10000 or 100000) >> >> If you still want to go with your scenario then try a network optimized >> instance and use s3a in Flink and configure s3 entropy. >> >> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru < >> vkollur...@gmail.com>: >> >> >> Hi David, >> >> The avg size of each file is around 30KB and I have checkpoint interval >> of 5 minutes. Some files are even 1 kb, because of checkpoint some files >> are merged into 1 big file around 300MB. >> >> With 120 million files and 4Tb, if the rate of transfer is 300 per >> minute, it is taking weeks to write to s3. >> >> I have tried to increase parallelism of sink but I dont see any >> improvement. >> >> The sink record is Tuple3<String,String,String>, the actual content of >> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-* >> >> I guess the prefix determination in custombucketassigner wont be causing >> this delay? >> >> Could you please shed some light on writing custom s3 sink ? >> >> Thanks >> >> >> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com> >> wrote: >> >>> Hi Venkata. >>> >>> 300 requests per minute look like a 200ms per request, which should be a >>> normal response time to send a file if there isn't any speed limitation >>> (how big are the files?). >>> >>> Have you changed the parallelization to be higher than 1? I also >>> recommend to limit the source parallelization, because it can consume >>> pretty fast from Kafka and create some kind of backpressure. >>> >>> I don't any much experience with StreamingFileSink, because I've ended >>> up using a custom S3Sink, but I did have some issues writing to S3 because >>> the request wasn't parallelised. Check this thread, >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070 >>> >>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru < >>> vkollur...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I have posted the same in stackoverflow but didnt get any response. So >>>> posting it here for help. >>>> >>>> >>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787 >>>> >>>> Details: >>>> >>>> I am working on a flink application on kubernetes(eks) which consumes >>>> data from kafka and write it to s3. >>>> >>>> We have around 120 million xml messages of size 4TB in kafka. Consuming >>>> from kafka is super fast. >>>> >>>> These are just string messages from kafka. >>>> >>>> There is a high back pressure while writing to s3. We are not even >>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am >>>> seeing only 300 writes per minute to S3 which is very slow. >>>> >>>> I am using StreamFileSink to write to s3 with Rolling policy as >>>> OnCheckpointPolicy. >>>> >>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or >>>> s3p) >>>> >>>> Other than this I dont have any config related to s3 >>>> >>>> StreamingFileSink<Tuple3<String,String, String>> sink = >>>> StreamingFileSink >>>> .forRowFormat(new Path(s3://BUCKET), >>>> (Tuple3<String,String, String> element, OutputStream >>>> stream) -> { >>>> PrintStream out = new PrintStream(stream); >>>> out.println(element.f2); >>>> }) >>>> // Determine component type for each record >>>> .withBucketAssigner(new CustomBucketAssigner()) >>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1))) >>>> .build(); >>>> >>>> Is there anything that we can optimize on s3 from streamfilesink or in >>>> flink-conf.xml ? >>>> >>>> Like using bulkformat or any config params like fs.s3.maxThreads etc. >>>> >>>> For checkpointing too I am using s3:// instead of s3p or s3a >>>> >>>> env.setStateBackend((StateBackend) new >>>> RocksDBStateBackend(s3://checkpoint_bucket, true)); >>>> env.enableCheckpointing(300000); >>>> >>>> >>>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >