As in https://github.com/aws/aws-sdk-java/blob/41a577e3f667bf5efb3d29a46aaf210bf70483a1/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/TransferManager.java#L2378 never gets called as it is never GCed...
On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi <[email protected]> wrote: > Thank you, > > This pretty much it.. so as you can see > > "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % > "provided", > > So the FS is in the lib > > and flink-s3-fs-hadoop-1.11.2.jar is in plugins > > Is there something you see weird with the below > > > val flinkDependencies = Seq( // Flink-runtime? "org.apache.flink" %% > "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% > "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" % > "flink-json" % flinkVersion % "provided", "org.apache.flink" % "flink-csv" % > flinkVersion % "provided", "org.apache.flink" % "flink-table-common" % > flinkVersion, "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, > "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, > "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided", > "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % > "provided", "org.apache.flink" %% "flink-clients" % flinkVersion, // For > parquet output "org.apache.flink" %% "flink-parquet" % flinkVersion, > "org.apache.flink" % "flink-avro" % flinkVersion, "org.apache.parquet" % > "parquet-avro" % "1.11.0", // manually added? "org.apache.flink" %% > "flink-hadoop-compatibility" % flinkVersion, "org.apache.flink" %% > "flink-connector-kafka" % flinkVersion, // Add to /lib. provided so this > doesn't bring in hadoop deps, which are in the shaded jar instead // tried > to put in $FLINK_HOME/lib but that didn't work? // Probably some other > hadoop libs need to be treated the same way "com.amazonaws" % "aws-java-sdk" > % "1.11.271", // use just S3? // Should be provided in prod? > "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % > "provided", // required for flink's shaded hadoop? "org.slf4j" % > "slf4j-api" % "1.7.15", "org.slf4j" % "slf4j-log4j12" % "1.7.15" % > "provided", "log4j" % "log4j" % "1.2.17" % "provided", // CLI > "com.github.scopt" %% "scopt" % "3.7.1", // JSON validation > "com.github.java-json-tools" % "json-schema-validator" % "2.2.14", // JSON > parsing "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.1", > "org.json4s" %% "json4s-jackson" % "3.6.9", // Non-terrible Json parsing > "io.circe" %% "circe-core" % circeVersion, "io.circe" %% "circe-generic" % > circeVersion, "io.circe" %% "circe-generic-extras" % circeVersion, > "io.circe" %% "circe-parser" % circeVersion, // Testing "org.scalatest" %% > "scalatest" % "3.0.4" % Test, "org.apache.flink" %% "flink-test-utils" % > flinkVersion % Test) > > > > On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[email protected]> > wrote: > >> FileSystems must not be bundled in the user jar. >> >> You must place them in lib/ or plugins/, because by bundling it you break >> our assumption that they exist for the lifetime of the cluster (which in >> turn means we don't really have to worry about cleaning up). >> >> On 2/10/2021 4:01 PM, Vishal Santoshi wrote: >> >> com/amazonaws/services/s3/transfer/TransferManager.class is in >> flink-s3-fs-hadoop-1.11.2.jar >> which is in the plugins and that AFAIK should have a dedicated ClassLoader >> per plugin. So does it make sense that these classes remain beyond the job >> and so does the executor service for multipart upload ? >> >> I guess the question are >> >> * Why are classes not being GCed and it seems that these >> threads reference objects loaded by the JobCLassloader and thus ob >> ClasslOader is not being GCed. >> * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as >> has been advised ... Can they be part of the uber jar ? >> >> >> >> On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi < >> [email protected]> wrote: >> >>> We do put the flink-hdoop-uber*.jar in the flink lib ( and thus >>> available to the root classloader ). That still does not explain the >>> executor service outliving the job. >>> >>> On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi < >>> [email protected]> wrote: >>> >>>> Hello folks, >>>> We see threads from >>>> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49 >>>> outlive a batch job that writes Parquet Files to S3, causing a ClassLoader >>>> Leak. Is this a known issue ? Logically a close on the TransferManager >>>> should close the ExecutorService ( and thus the threads ), >>>> >>>> The code is fairly straightforward, >>>> >>>> val job = new Job() val hadoopOutFormat = new >>>> HadoopOutputFormat[Void, GenericRecord]( new >>>> AvroParquetOutputFormat(), job ) >>>> AvroParquetOutputFormat.setSchema(job, schema) >>>> FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) >>>> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY) >>>> ParquetOutputFormat.setEnableDictionary(job, true) // do we need this? >>>> >>>> and then an output >>>> >>>> This is using >>>> >>>> >>>> scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3" >>>> >>>> >>>> Regards >>>> >>>> Vishal >>>> >>>> >>
