com/amazonaws/services/s3/transfer/TransferManager.class is in flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ?
I guess the question are * Why are classes not being GCed and it seems that these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ? On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > We do put the flink-hdoop-uber*.jar in the flink lib ( and thus available > to the root classloader ). That still does not explain the executor > service outliving the job. > > On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Hello folks, >> We see threads from >> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49 >> outlive a batch job that writes Parquet Files to S3, causing a ClassLoader >> Leak. Is this a known issue ? Logically a close on the TransferManager >> should close the ExecutorService ( and thus the threads ), >> >> The code is fairly straightforward, >> >> val job = new Job() val hadoopOutFormat = new >> HadoopOutputFormat[Void, GenericRecord]( new AvroParquetOutputFormat(), >> job ) AvroParquetOutputFormat.setSchema(job, schema) >> FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) >> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY) >> ParquetOutputFormat.setEnableDictionary(job, true) // do we need this? >> >> and then an output >> >> This is using >> >> >> scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3" >> >> >> >> Regards >> >> Vishal >> >>