Hi guys, I have a simple batch job with a custom output formatter that writes to a local file.
public class JobHadoop { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(Sets.newHashSet("line1", "line2", "line3")) .map(line -> line + "dd") .write(new HadoopUsageOutputFormat(), "file:///tmp/out"); env.execute(JobHadoop.class.getName()); } } public class HadoopUsageOutputFormat extends FileOutputFormat<String> implements OutputFormat<String> { private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); public static final String DEFAULT_LINE_DELIMITER = "\n"; private Writer writer; static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); writer = new OutputStreamWriter(new BufferedOutputStream(stream)); } @Override public void writeRecord(String record) throws IOException { writer.write(record); writer.write(DEFAULT_LINE_DELIMITER); } @Override public void close() throws IOException { if (writer != null) { this.writer.flush(); this.writer.close(); } super.close(); } } The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion. [cid:55491778-9e15-4f39-bb1a-637d855e68fb] Also, the problem reproduces only if I actually invoke the set method of Configuration: static { DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1"); } >From my observations, if I change the private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration(); to a non-static field, then the problem does no reproduce any more. However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself. Thanks, Vadim.