Adding to the above query, I have tried dropping the tables and the function as well but no luck.
On Wed, Apr 19, 2023 at 11:01 AM neha goyal <nehagoy...@gmail.com> wrote: > Hello, > > I am attaching a sample code and screenshot where Flink is holding the > reference to a jar file even after I close the streamExecutionEnvironment. > > Due to this, the deleted file is not getting cleaned up from the disk and > we are getting disc space alerts. When we restart our application then > these files get cleared from the disk. > What is the way to gracefully shut down the Flink environment so that it > releases all the resources' references? > > public class TestResourceRelease { > > public void check(){ > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > try{ > StreamTableEnvironment env = > StreamTableEnvironment.create(execEnv); > env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS > 'com.my.udf.v2.EpochToDate' USING JAR > 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); > TypeInformation[] typeInformationArray = getTypeInfoArray(); > String[] columnName = new String[]{"x", "y"}; > KafkaSource<Row> source = > KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest()) > .setValueOnlyDeserializer(new > JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, > typeInformationArray))) > .setProperty("bootstrap.servers", "localhost:9092") > .setTopics("test").build(); > > DataStream<Row> stream = execEnv.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > env.registerDataStream("test", stream); > > Table table = env.fromDataStream(stream); > env.registerTable("my_test", table); > Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS > `order_time`, `y` FROM `my_test`"); > System.out.println("created the table"); > } > catch (Exception e){ > System.out.println(e); > > } > finally { > try { > execEnv.close(); > } catch (Exception e) { > e.printStackTrace(); > } > File file = new > File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); > file.delete(); > } > > } > > public static TypeInformation[] getTypeInfoArray(){ > TypeInformation[] typeInformations = new TypeInformation[2]; > typeInformations[0] = org.apache.flink.table.api.Types.LONG(); > typeInformations[1] = org.apache.flink.table.api.Types.LONG(); > return typeInformations; > } > > } > >