FileSystems must not be bundled in the user jar.
You must place them in lib/ or plugins/, because by bundling it you
break our assumption that they exist for the lifetime of the cluster
(which in turn means we don't really have to worry about cleaning up).
On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
com/amazonaws/services/s3/transfer/TransferManager.class is in
flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK
should have a dedicated ClassLoader per plugin. So does it make sense
that these classes remain beyond the job and so does the executor
service for multipart upload ?
I guess the question are
* Why are classes not being GCed and it seems that these
threads reference objects loaded by the JobCLassloader and thus ob
ClasslOader is not being GCed.
* Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as
has been advised ... Can they be part of the uber jar ?
On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi
<[email protected] <mailto:[email protected]>> wrote:
We do put the flink-hdoop-uber*.jar in the flink lib ( and thus
available to the root classloader ). That still does not explain
the executor service outliving the job.
On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi
<[email protected] <mailto:[email protected]>> wrote:
Hello folks,
We see threads from
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49
<https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49>
outlive a batch job that writes Parquet Files to S3, causing
a ClassLoader Leak. Is this a known issue ? Logically a close
on the TransferManager should close the ExecutorService ( and
thus the threads ),
The code is fairly straightforward,
val job = new Job()
val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
new AvroParquetOutputFormat(),
job
)
AvroParquetOutputFormat.setSchema(job, schema)
FileOutputFormat.setOutputPath(job, new
org.apache.hadoop.fs.Path(path))
ParquetOutputFormat.setCompression(job,
CompressionCodecName.SNAPPY)
ParquetOutputFormat.setEnableDictionary(job, true) // do we
need this?
and then an output
This is using
scalaVersion := "2.12.12"
flinkVersion = "1.11.2"
hadoopVersion = "2.8.3"
Regards
Vishal