As in
https://github.com/aws/aws-sdk-java/blob/41a577e3f667bf5efb3d29a46aaf210bf70483a1/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/TransferManager.java#L2378
never gets called as it is never GCed...

On Wed, Feb 10, 2021 at 10:47 AM Vishal Santoshi <[email protected]>
wrote:

> Thank you,
>
> This pretty much it.. so as you can see
>
>  "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % 
> "provided",
>
> So the FS is in the lib
>
> and flink-s3-fs-hadoop-1.11.2.jar is in plugins
>
> Is there something you see weird with the below
>
>
> val flinkDependencies = Seq(  // Flink-runtime?  "org.apache.flink" %% 
> "flink-scala" % flinkVersion % "provided",  "org.apache.flink" %% 
> "flink-streaming-scala" % flinkVersion % "provided",  "org.apache.flink" % 
> "flink-json" % flinkVersion % "provided",  "org.apache.flink" % "flink-csv" % 
> flinkVersion % "provided",  "org.apache.flink" % "flink-table-common" % 
> flinkVersion,  "org.apache.flink" %% "flink-table-api-scala" % flinkVersion,  
> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,  
> "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",  
> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % 
> "provided",  "org.apache.flink" %% "flink-clients" % flinkVersion,  // For 
> parquet output  "org.apache.flink" %% "flink-parquet" % flinkVersion,  
> "org.apache.flink" % "flink-avro" % flinkVersion,  "org.apache.parquet" % 
> "parquet-avro" % "1.11.0",  // manually added?  "org.apache.flink" %% 
> "flink-hadoop-compatibility" % flinkVersion,  "org.apache.flink" %% 
> "flink-connector-kafka" % flinkVersion,  // Add to /lib. provided so this 
> doesn't bring in hadoop deps, which are in the shaded jar instead  // tried 
> to put in $FLINK_HOME/lib but that didn't work?  // Probably some other 
> hadoop libs need to be treated the same way  "com.amazonaws" % "aws-java-sdk" 
> % "1.11.271",  // use just S3?  // Should be provided in prod?  
> "org.apache.flink" % "flink-shaded-hadoop-2-uber" % "2.8.3-10.0" % 
> "provided",   // required for flink's shaded hadoop?  "org.slf4j" % 
> "slf4j-api" % "1.7.15",  "org.slf4j" % "slf4j-log4j12" % "1.7.15" % 
> "provided",  "log4j" % "log4j" % "1.2.17" % "provided",  // CLI  
> "com.github.scopt" %% "scopt" % "3.7.1",  // JSON validation  
> "com.github.java-json-tools" % "json-schema-validator" % "2.2.14",  // JSON 
> parsing  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.1", 
>  "org.json4s" %% "json4s-jackson" % "3.6.9",  // Non-terrible Json parsing  
> "io.circe" %% "circe-core" % circeVersion,  "io.circe" %% "circe-generic" % 
> circeVersion,  "io.circe" %% "circe-generic-extras" % circeVersion,  
> "io.circe" %% "circe-parser" % circeVersion,  // Testing  "org.scalatest" %% 
> "scalatest" % "3.0.4" % Test,  "org.apache.flink" %% "flink-test-utils" % 
> flinkVersion % Test)
>
>
>
> On Wed, Feb 10, 2021 at 10:14 AM Chesnay Schepler <[email protected]>
> wrote:
>
>> FileSystems must not be bundled in the user jar.
>>
>> You must place them in lib/ or plugins/, because by bundling it you break
>> our assumption that they exist for the lifetime of the cluster (which in
>> turn means we don't really have to worry about cleaning up).
>>
>> On 2/10/2021 4:01 PM, Vishal Santoshi wrote:
>>
>> com/amazonaws/services/s3/transfer/TransferManager.class  is in  
>> flink-s3-fs-hadoop-1.11.2.jar
>> which is in the plugins and that AFAIK should have a dedicated ClassLoader
>> per plugin. So does it make sense that these classes remain beyond the job
>> and so does the executor service for multipart upload ?
>>
>> I guess the question are
>>
>> * Why are classes not being GCed and it seems  that  these
>> threads reference objects loaded by the JobCLassloader and thus ob
>> ClasslOader is not being GCed.
>> * Does the flink-s3-fs-hadoop-1.11.2.jar need to be in the plugins as
>> has been advised ... Can they be part of the uber jar ?
>>
>>
>>
>> On Wed, Feb 10, 2021 at 9:27 AM Vishal Santoshi <
>> [email protected]> wrote:
>>
>>> We do put the flink-hdoop-uber*.jar in the flink lib ( and thus
>>> available to the root classloader  ). That still does not explain the
>>> executor service outliving the job.
>>>
>>> On Tue, Feb 9, 2021 at 6:49 PM Vishal Santoshi <
>>> [email protected]> wrote:
>>>
>>>> Hello folks,
>>>>                  We see threads from
>>>> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/transfer/internal/TransferManagerUtils.java#L49
>>>> outlive a batch job that writes Parquet Files to S3, causing  a ClassLoader
>>>> Leak. Is this a known issue ?  Logically a close on the TransferManager
>>>> should close the ExecutorService ( and thus the threads ),
>>>>
>>>> The code is fairly straightforward,
>>>>
>>>>     val job = new Job()    val hadoopOutFormat = new 
>>>> HadoopOutputFormat[Void, GenericRecord](      new 
>>>> AvroParquetOutputFormat(),      job    )    
>>>> AvroParquetOutputFormat.setSchema(job, schema)    
>>>> FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))   
>>>>  ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY)    
>>>> ParquetOutputFormat.setEnableDictionary(job, true)  // do we need this?
>>>>
>>>>      and then an output
>>>>
>>>> This is  using
>>>>
>>>>
>>>> scalaVersion := "2.12.12"flinkVersion = "1.11.2"hadoopVersion = "2.8.3"
>>>>
>>>>
>>>> Regards
>>>>
>>>> Vishal
>>>>
>>>>
>>

Reply via email to