Hi Fink team, Excuse me, I have a common question about how to release the resource held by some object in Flink Job.
Thank you in advance!. I am a new user of Flink. I search a lot but did not find related documents. But I am sure there is a standard way to resolve it. In our case, we need access Elasticsearch server. And we use a class EsClient extends from `org.elasticsearch.client.RestHighLevelClient` to does query work, which requires calling its `close` method once it is not used anymore to release resources. So I need to find the right place to do that to make sure the resource always can be released, even if some exceptions happen somewhere in the invoked method of the main class or other class. What I can come up with now is using a `ThreadLocal` to keep the EsClient object at the start of the main class’s main method and release the resource at the end of the main method. The pseudocode (Java) like this: ```java public class EsClientHolder { private static final ThreadLocal<EsClient> local = new InheritableThreadLocal<>(); public static final void createAndSetEsClient(EsClient esClient) { local.set(esClient); } public static final void createAndSetEsClientBy(EsClientConfig esClientConfig) { EsClient instance = new EsClient(); local.set(instance); } public static final EsClient get() { EsClient c = local.get(); if (c == null) { throw new RuntimeException("Make sure to create and set the EsClient instance before use it"); } return c; } public static final void close() throws IOException { EsClient o = local.get(); if (o != null) { o.close(); } } // usage in Fink application code public class MainClass { public static void main(String[] args) throws IOException { try { Properties prop = null; EsClientConfig config = getEsClientConfig(prop); EsClientHolder.createAndSetEsClientBy(config); // … SomeClass.method1(); OtherClass.method2(); // ... } finally { EsClientHolder.close(); } } } class SomeClass{ public void. method1(){ // 1. use the EsClient in any invoked method of any other class: EsClient esClient = EsClientHolder.get(); // … } } class OtherClass{ public void method2(){ // 2. use the EsClient in any invoked method of any forked child thread new Thread( () -> { EsClient client = EsClientHolder.get(); // … }) .start(); // … } } ``` I understand that TaskManager is a Java JVM process and Task is executed by a java Thread. But I do not know how Flink creates a job graph and eventually how Flink allocates the tasks to threads and the relationship within these threads. For example, if Flink cut the method1 of SomeClass and method2 of OtherClass into another thread, not the same as the thread running the MainClass Then the thread running method1 and mehod2 has no way to get the EsClient. Here I assume the main method in the MainClass will be executed in one thread. If it is not, the set() and close() are split to run in different threads, then there is no way to release the resource. Thank you!