FileSystems must not be bundled in the user jar.

You must place them in lib/ or plugins/, because by bundling it you break our assumption that they exist for the lifetime of the cluster (which in turn means we don't really have to worry about cleaning up).

On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
com/amazonaws/services/s3/transfer/TransferManager.class is in flink-s3-fs-hadoop-1.11.2.jar which is in the plugins and that AFAIK should have a dedicated ClassLoader per plugin. So does it make sense that these classes remain beyond the job and so does the executor service for multipart upload ?

I guess the question are

* Why are classes not being GCed and it seems  that  these threads reference objects loaded by the JobCLassloader and thus ob ClasslOader is not being GCed. * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as has been advised ... Can they be part of the uber jar ?



On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <[email protected] <mailto:[email protected]>> wrote:

    We do put the flink-hdoop-uber*.jar in the flink lib ( and thus
    available to the root classloader  ). That still does not explain
    the executor service outliving the job.

    On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi
    <[email protected] <mailto:[email protected]>> wrote:

        Hello folks,
                         We see threads from
        
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49
        
<https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49>
        outlive a batch job that writes Parquet Files to S3, causing 
        a ClassLoader Leak. Is this a known issue ? Logically a close
        on the TransferManager should close the ExecutorService ( and
        thus the threads ),

        The code is fairly straightforward,

        val job = new Job()
        val hadoopOutFormat = new HadoopOutputFormat[Void, GenericRecord](
        new AvroParquetOutputFormat(),
        job
        )
        AvroParquetOutputFormat.setSchema(job, schema)
        FileOutputFormat.setOutputPath(job, new
        org.apache.hadoop.fs.Path(path))
        ParquetOutputFormat.setCompression(job,
        CompressionCodecName.SNAPPY)
        ParquetOutputFormat.setEnableDictionary(job, true) // do we
        need this?

             and then an output

        This is  using


        scalaVersion := "2.12.12"
        flinkVersion = "1.11.2"
        hadoopVersion = "2.8.3"


        Regards

        Vishal


Reply via email to