Hi Shammon, The Flink job doesn't exist after I close the execution environment right? Can you please try the attached code and see that I am not sharing the file with any other job? Until I close the running Java application the file still has an open reference in the code mentioned.
On Thu, Apr 20, 2023 at 7:25 AM Shammon FY <zjur...@gmail.com> wrote: > Hi neha > > Flink can delete runtime data for a job when it goes to termination. But > for external files such as udf jar files as you mentioned, I think you need > to manage them yourself. The files may be shared between jobs, and can not > be deleted when one flink job exists. > > Best, > Shammon FY > > > On Wed, Apr 19, 2023 at 1:37 PM neha goyal <nehagoy...@gmail.com> wrote: > >> Adding to the above query, I have tried dropping the tables and the >> function as well but no luck. >> >> On Wed, Apr 19, 2023 at 11:01 AM neha goyal <nehagoy...@gmail.com> wrote: >> >>> Hello, >>> >>> I am attaching a sample code and screenshot where Flink is holding the >>> reference to a jar file even after I close the streamExecutionEnvironment. >>> >>> Due to this, the deleted file is not getting cleaned up from the disk >>> and we are getting disc space alerts. When we restart our application then >>> these files get cleared from the disk. >>> What is the way to gracefully shut down the Flink environment so that it >>> releases all the resources' references? >>> >>> public class TestResourceRelease { >>> >>> public void check(){ >>> StreamExecutionEnvironment execEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> try{ >>> StreamTableEnvironment env = >>> StreamTableEnvironment.create(execEnv); >>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS >>> 'com.my.udf.v2.EpochToDate' USING JAR >>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); >>> TypeInformation[] typeInformationArray = getTypeInfoArray(); >>> String[] columnName = new String[]{"x", "y"}; >>> KafkaSource<Row> source = >>> KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest()) >>> .setValueOnlyDeserializer(new >>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, >>> typeInformationArray))) >>> .setProperty("bootstrap.servers", "localhost:9092") >>> .setTopics("test").build(); >>> >>> DataStream<Row> stream = execEnv.fromSource(source, >>> WatermarkStrategy.noWatermarks(), "Kafka Source"); >>> env.registerDataStream("test", stream); >>> >>> Table table = env.fromDataStream(stream); >>> env.registerTable("my_test", table); >>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS >>> `order_time`, `y` FROM `my_test`"); >>> System.out.println("created the table"); >>> } >>> catch (Exception e){ >>> System.out.println(e); >>> >>> } >>> finally { >>> try { >>> execEnv.close(); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> File file = new >>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); >>> file.delete(); >>> } >>> >>> } >>> >>> public static TypeInformation[] getTypeInfoArray(){ >>> TypeInformation[] typeInformations = new TypeInformation[2]; >>> typeInformations[0] = org.apache.flink.table.api.Types.LONG(); >>> typeInformations[1] = org.apache.flink.table.api.Types.LONG(); >>> return typeInformations; >>> } >>> >>> } >>> >>>