Hi Shammon,

The Flink job doesn't exist after I close the execution environment right?
Can you please try the attached code and see that I am not sharing the file
with any other job? Until I close the running Java application the file
still has an open reference in the code mentioned.

On Thu, Apr 20, 2023 at 7:25 AM Shammon FY <zjur...@gmail.com> wrote:

> Hi neha
>
> Flink can delete runtime data for a job when it goes to termination. But
> for external files such as udf jar files as you mentioned, I think you need
> to manage them yourself. The files may be shared between jobs, and can not
> be deleted when one flink job exists.
>
> Best,
> Shammon FY
>
>
> On Wed, Apr 19, 2023 at 1:37 PM neha goyal <nehagoy...@gmail.com> wrote:
>
>> Adding to the above query, I have tried dropping the tables and the
>> function as well but no luck.
>>
>> On Wed, Apr 19, 2023 at 11:01 AM neha goyal <nehagoy...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am attaching a sample code and screenshot where Flink is holding the
>>> reference to a jar file even after I close the streamExecutionEnvironment.
>>>
>>> Due to this, the deleted file is not getting cleaned up from the disk
>>> and we are getting disc space alerts. When we restart our application then
>>> these files get cleared from the disk.
>>> What is the way to gracefully shut down the Flink environment so that it
>>> releases all the resources' references?
>>>
>>> public class TestResourceRelease {
>>>
>>>     public void check(){
>>>         StreamExecutionEnvironment execEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         try{
>>>             StreamTableEnvironment env = 
>>> StreamTableEnvironment.create(execEnv);
>>>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
>>> 'com.my.udf.v2.EpochToDate' USING JAR 
>>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>>>             String[] columnName = new String[]{"x", "y"};
>>>             KafkaSource<Row> source = 
>>> KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>>>                     .setValueOnlyDeserializer(new 
>>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
>>> typeInformationArray)))
>>>                     .setProperty("bootstrap.servers", "localhost:9092")
>>>                     .setTopics("test").build();
>>>
>>>             DataStream<Row> stream = execEnv.fromSource(source, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>>>             env.registerDataStream("test", stream);
>>>
>>>             Table table = env.fromDataStream(stream);
>>>             env.registerTable("my_test", table);
>>>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
>>> `order_time`, `y` FROM `my_test`");
>>>             System.out.println("created the table");
>>>         }
>>>         catch (Exception e){
>>>             System.out.println(e);
>>>
>>>         }
>>>         finally {
>>>             try {
>>>                 execEnv.close();
>>>             } catch (Exception e) {
>>>                 e.printStackTrace();
>>>             }
>>>             File file = new 
>>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>>             file.delete();
>>>         }
>>>
>>>     }
>>>
>>>     public static TypeInformation[] getTypeInfoArray(){
>>>         TypeInformation[] typeInformations = new TypeInformation[2];
>>>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>>         return typeInformations;
>>>     }
>>>
>>> }
>>>
>>>

Reply via email to