Hello, I am attaching a sample code and screenshot where Flink is holding the reference to a jar file even after I close the streamExecutionEnvironment.
Due to this, the deleted file is not getting cleaned up from the disk and we are getting disc space alerts. When we restart our application then these files get cleared from the disk. What is the way to gracefully shut down the Flink environment so that it releases all the resources' references? public class TestResourceRelease { public void check(){ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); try{ StreamTableEnvironment env = StreamTableEnvironment.create(execEnv); env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 'com.my.udf.v2.EpochToDate' USING JAR 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); TypeInformation[] typeInformationArray = getTypeInfoArray(); String[] columnName = new String[]{"x", "y"}; KafkaSource<Row> source = KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, typeInformationArray))) .setProperty("bootstrap.servers", "localhost:9092") .setTopics("test").build(); DataStream<Row> stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); env.registerDataStream("test", stream); Table table = env.fromDataStream(stream); env.registerTable("my_test", table); Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS `order_time`, `y` FROM `my_test`"); System.out.println("created the table"); } catch (Exception e){ System.out.println(e); } finally { try { execEnv.close(); } catch (Exception e) { e.printStackTrace(); } File file = new File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); file.delete(); } } public static TypeInformation[] getTypeInfoArray(){ TypeInformation[] typeInformations = new TypeInformation[2]; typeInformations[0] = org.apache.flink.table.api.Types.LONG(); typeInformations[1] = org.apache.flink.table.api.Types.LONG(); return typeInformations; } }