Hello,

I am attaching a sample code and screenshot where Flink is holding the
reference to a jar file even after I close the streamExecutionEnvironment.

Due to this, the deleted file is not getting cleaned up from the disk and
we are getting disc space alerts. When we restart our application then
these files get cleared from the disk.
What is the way to gracefully shut down the Flink environment so that it
releases all the resources' references?

public class TestResourceRelease {

    public void check(){
        StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        try{
            StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
            env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate
AS 'com.my.udf.v2.EpochToDate' USING JAR
'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
            TypeInformation[] typeInformationArray = getTypeInfoArray();
            String[] columnName = new String[]{"x", "y"};
            KafkaSource<Row> source =
KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new
JsonRowDeserializationSchema(Types.ROW_NAMED(columnName,
typeInformationArray)))
                    .setProperty("bootstrap.servers", "localhost:9092")
                    .setTopics("test").build();

            DataStream<Row> stream = execEnv.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
            env.registerDataStream("test", stream);

            Table table = env.fromDataStream(stream);
            env.registerTable("my_test", table);
            Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS
`order_time`, `y` FROM `my_test`");
            System.out.println("created the table");
        }
        catch (Exception e){
            System.out.println(e);

        }
        finally {
            try {
                execEnv.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            File file = new
File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
            file.delete();
        }

    }

    public static TypeInformation[] getTypeInfoArray(){
        TypeInformation[] typeInformations = new TypeInformation[2];
        typeInformations[0] = org.apache.flink.table.api.Types.LONG();
        typeInformations[1] = org.apache.flink.table.api.Types.LONG();
        return typeInformations;
    }

}

Reply via email to