Hi Vishal, if you have the possibility could you create a memdump? It would be interesting to know why the TransferManager is never released.
Note that it's impossible to release all objects/classes loaded through a particular ClassLoader, all we can do is making sure that the ClassLoader is not used anymore, leading to a full release if all object instances are released. However, this doesn't seem to work in your case. On Wed, Feb 10, 2021 at 6:43 PM Vishal Santoshi <[email protected]> wrote: > As in > https://github.com/aws/aws-sdk-java/blob/41a577e3f667bf5efb3d29a46aaf210bf70483a1/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/TransferManager.java#L2378 > never gets called as it is never GCed... > > On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi < > [email protected]> wrote: > >> Thank you, >> >> This pretty much it.. so as you can see >> >> "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % >> "provided", >> >> So the FS is in the lib >> >> and flink-s3-fs-hadoop-1.11.2.jar is in plugins >> >> Is there something you see weird with the below >> >> >> val flinkDependencies = Seq( // Flink-runtime? "org.apache.flink" %% >> "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% >> "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" % >> "flink-json" % flinkVersion % "provided", "org.apache.flink" % "flink-csv" >> % flinkVersion % "provided", "org.apache.flink" % "flink-table-common" % >> flinkVersion, "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, >> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, >> "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided", >> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % >> "provided", "org.apache.flink" %% "flink-clients" % flinkVersion, // For >> parquet output "org.apache.flink" %% "flink-parquet" % flinkVersion, >> "org.apache.flink" % "flink-avro" % flinkVersion, "org.apache.parquet" % >> "parquet-avro" % "1.11.0", // manually added? "org.apache.flink" %% >> "flink-hadoop-compatibility" % flinkVersion, "org.apache.flink" %% >> "flink-connector-kafka" % flinkVersion, // Add to /lib. provided so this >> doesn't bring in hadoop deps, which are in the shaded jar instead // tried >> to put in $FLINK_HOME/lib but that didn't work? // Probably some other >> hadoop libs need to be treated the same way "com.amazonaws" % >> "aws-java-sdk" % "1.11.271", // use just S3? // Should be provided in >> prod? "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % >> "provided", // required for flink's shaded hadoop? "org.slf4j" % >> "slf4j-api" % "1.7.15", "org.slf4j" % "slf4j-log4j12" % "1.7.15" % >> "provided", "log4j" % "log4j" % "1.2.17" % "provided", // CLI >> "com.github.scopt" %% "scopt" % "3.7.1", // JSON validation >> "com.github.java-json-tools" % "json-schema-validator" % "2.2.14", // JSON >> parsing "com.fasterxml.jackson.module" %% "jackson-module-scala" % >> "2.11.1", "org.json4s" %% "json4s-jackson" % "3.6.9", // Non-terrible Json >> parsing "io.circe" %% "circe-core" % circeVersion, "io.circe" %% >> "circe-generic" % circeVersion, "io.circe" %% "circe-generic-extras" % >> circeVersion, "io.circe" %% "circe-parser" % circeVersion, // Testing >> "org.scalatest" %% "scalatest" % "3.0.4" % Test, "org.apache.flink" %% >> "flink-test-utils" % flinkVersion % Test) >> >> >> >> On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[email protected]> >> wrote: >> >>> FileSystems must not be bundled in the user jar. >>> >>> You must place them in lib/ or plugins/, because by bundling it you >>> break our assumption that they exist for the lifetime of the cluster (which >>> in turn means we don't really have to worry about cleaning up). >>> >>> On 2/10/2021 4:01 PM, Vishal Santoshi wrote: >>> >>> com/amazonaws/services/s3/transfer/TransferManager.class is in >>> flink-s3-fs-hadoop-1.11.2.jar >>> which is in the plugins and that AFAIK should have a dedicated ClassLoader >>> per plugin. So does it make sense that these classes remain beyond the job >>> and so does the executor service for multipart upload ? >>> >>> I guess the question are >>> >>> * Why are classes not being GCed and it seems that these >>> threads reference objects loaded by the JobCLassloader and thus ob >>> ClasslOader is not being GCed. >>> * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as >>> has been advised ... Can they be part of the uber jar ? >>> >>> >>> >>> On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi < >>> [email protected]> wrote: >>> >>>> We do put the flink-hdoop-uber*.jar in the flink lib ( and thus >>>> available to the root classloader ). That still does not explain the >>>> executor service outliving the job. >>>> >>>> On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi < >>>> [email protected]> wrote: >>>> >>>>> Hello folks, >>>>> We see threads from >>>>> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49 >>>>> outlive a batch job that writes Parquet Files to S3, causing a >>>>> ClassLoader >>>>> Leak. Is this a known issue ? Logically a close on the TransferManager >>>>> should close the ExecutorService ( and thus the threads ), >>>>> >>>>> The code is fairly straightforward, >>>>> >>>>> val job = new Job() val hadoopOutFormat = new >>>>> HadoopOutputFormat[Void, GenericRecord]( new >>>>> AvroParquetOutputFormat(), job ) >>>>> AvroParquetOutputFormat.setSchema(job, schema) >>>>> FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) >>>>> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY) >>>>> ParquetOutputFormat.setEnableDictionary(job, true) // do we need this? >>>>> >>>>> and then an output >>>>> >>>>> This is using >>>>> >>>>> >>>>> scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3" >>>>> >>>>> >>>>> Regards >>>>> >>>>> Vishal >>>>> >>>>> >>>
