Hi Vishal,

if you have the possibility could you create a memdump? It would be
interesting to know why the TransferManager is never released.

Note that it's impossible to release all objects/classes loaded through a
particular ClassLoader, all we can do is making sure that the ClassLoader
is not used anymore, leading to a full release if all object instances are
released. However, this doesn't seem to work in your case.

On Wed, Feb 10, 2021 at 6:43 PM Vishal Santoshi <[email protected]>
wrote:

> As in
> https://github.com/aws/aws-sdk-java/blob/41a577e3f667bf5efb3d29a46aaf210bf70483a1/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/TransferManager.java#L2378
> never gets called as it is never GCed...
>
> On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi <
> [email protected]> wrote:
>
>> Thank you,
>>
>> This pretty much it.. so as you can see
>>
>>  "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % 
>> "provided",
>>
>> So the FS is in the lib
>>
>> and flink-s3-fs-hadoop-1.11.2.jar is in plugins
>>
>> Is there something you see weird with the below
>>
>>
>> val flinkDependencies = Seq(  // Flink-runtime?  "org.apache.flink" %% 
>> "flink-scala" % flinkVersion % "provided",  "org.apache.flink" %% 
>> "flink-streaming-scala" % flinkVersion % "provided",  "org.apache.flink" % 
>> "flink-json" % flinkVersion % "provided",  "org.apache.flink" % "flink-csv" 
>> % flinkVersion % "provided",  "org.apache.flink" % "flink-table-common" % 
>> flinkVersion,  "org.apache.flink" %% "flink-table-api-scala" % flinkVersion, 
>>  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,  
>> "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",  
>> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % 
>> "provided",  "org.apache.flink" %% "flink-clients" % flinkVersion,  // For 
>> parquet output  "org.apache.flink" %% "flink-parquet" % flinkVersion,  
>> "org.apache.flink" % "flink-avro" % flinkVersion,  "org.apache.parquet" % 
>> "parquet-avro" % "1.11.0",  // manually added?  "org.apache.flink" %% 
>> "flink-hadoop-compatibility" % flinkVersion,  "org.apache.flink" %% 
>> "flink-connector-kafka" % flinkVersion,  // Add to /lib. provided so this 
>> doesn't bring in hadoop deps, which are in the shaded jar instead  // tried 
>> to put in $FLINK_HOME/lib but that didn't work?  // Probably some other 
>> hadoop libs need to be treated the same way  "com.amazonaws" % 
>> "aws-java-sdk" % "1.11.271",  // use just S3?  // Should be provided in 
>> prod?  "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % 
>> "provided",   // required for flink's shaded hadoop?  "org.slf4j" % 
>> "slf4j-api" % "1.7.15",  "org.slf4j" % "slf4j-log4j12" % "1.7.15" % 
>> "provided",  "log4j" % "log4j" % "1.2.17" % "provided",  // CLI  
>> "com.github.scopt" %% "scopt" % "3.7.1",  // JSON validation  
>> "com.github.java-json-tools" % "json-schema-validator" % "2.2.14",  // JSON 
>> parsing  "com.fasterxml.jackson.module" %% "jackson-module-scala" % 
>> "2.11.1",  "org.json4s" %% "json4s-jackson" % "3.6.9",  // Non-terrible Json 
>> parsing  "io.circe" %% "circe-core" % circeVersion,  "io.circe" %% 
>> "circe-generic" % circeVersion,  "io.circe" %% "circe-generic-extras" % 
>> circeVersion,  "io.circe" %% "circe-parser" % circeVersion,  // Testing  
>> "org.scalatest" %% "scalatest" % "3.0.4" % Test,  "org.apache.flink" %% 
>> "flink-test-utils" % flinkVersion % Test)
>>
>>
>>
>> On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[email protected]>
>> wrote:
>>
>>> FileSystems must not be bundled in the user jar.
>>>
>>> You must place them in lib/ or plugins/, because by bundling it you
>>> break our assumption that they exist for the lifetime of the cluster (which
>>> in turn means we don't really have to worry about cleaning up).
>>>
>>> On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
>>>
>>> com/amazonaws/services/s3/transfer/TransferManager.class  is in  
>>> flink-s3-fs-hadoop-1.11.2.jar
>>> which is in the plugins and that AFAIK should have a dedicated ClassLoader
>>> per plugin. So does it make sense that these classes remain beyond the job
>>> and so does the executor service for multipart upload ?
>>>
>>> I guess the question are
>>>
>>> * Why are classes not being GCed and it seems  that  these
>>> threads reference objects loaded by the JobCLassloader and thus ob
>>> ClasslOader is not being GCed.
>>> * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as
>>> has been advised ... Can they be part of the uber jar ?
>>>
>>>
>>>
>>> On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <
>>> [email protected]> wrote:
>>>
>>>> We do put the flink-hdoop-uber*.jar in the flink lib ( and thus
>>>> available to the root classloader  ). That still does not explain the
>>>> executor service outliving the job.
>>>>
>>>> On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <
>>>> [email protected]> wrote:
>>>>
>>>>> Hello folks,
>>>>>                  We see threads from
>>>>> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49
>>>>> outlive a batch job that writes Parquet Files to S3, causing  a 
>>>>> ClassLoader
>>>>> Leak. Is this a known issue ?  Logically a close on the TransferManager
>>>>> should close the ExecutorService ( and thus the threads ),
>>>>>
>>>>> The code is fairly straightforward,
>>>>>
>>>>>     val job = new Job()    val hadoopOutFormat = new 
>>>>> HadoopOutputFormat[Void, GenericRecord](      new 
>>>>> AvroParquetOutputFormat(),      job    )    
>>>>> AvroParquetOutputFormat.setSchema(job, schema)    
>>>>> FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))  
>>>>>   ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)    
>>>>> ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
>>>>>
>>>>>      and then an output
>>>>>
>>>>> This is  using
>>>>>
>>>>>
>>>>> scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3"
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>> Vishal
>>>>>
>>>>>
>>>

Reply via email to