Starting here the discussion after an initial discussion with Ververica and AWS 
teams during FlinkForward.
I'm investigating the performances of a Flink job that transports data from 
Kafka to an S3 Sink.
We are using a BucketingSink to write parquet files. The bucketing logic 
divides the messages having a folder per type of data, tenant (customer), 
date-time, extraction Id, etc etc. This results in each file is stored in a 
folder structure composed by 9-10 layers 
(s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...)

If the data is distributed as bursts of messages for tenant-type we see good 
performances in writing, but when the data is more a white noise distribution 
on thousands of tenants, dozens of data types and multiple extraction IDs, we 
have an incredible loss of performances. (in the order of 300x times)

Attaching a debugger, it seems the issue is connected to the number of handlers 
open at the same time on S3 to write data. More specifically 
https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png

Researching in the hadoop libraries used to write to S3 I have found some 
possible improvements setting:
      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>
But none of these made a big difference in throughput.

I hope to bring ahead the discussion and see if we can find a clear issue in 
the logic or possible work-around.

Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem 
(BucketingSink)

Reply via email to