Hi, 不建议在 TM 内部多个 Task 间共享变量,每个 Task 单独使用自己的资源,在 RichFunction open 时初始化资源,close 时释放资源。否则容易导致资源泄露
Best, Weihua On Tue, Jul 12, 2022 at 2:31 PM RS <tinyshr...@163.com> wrote: > Hi, > > > 如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法 > > > > 资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象, > > > Thanks > > > 在 2022-07-12 12:35:31,"Bruce Zu" <zu.bruce.ch...@gmail.com> 写道: > > Flink team好, > > 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。 > > > > 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。 > > > >我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从 > >org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作, > >一旦不再使用它就需要调用它的`close`方法来释放资源。 > > > >所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常 > > > >我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且 > >在 main 方法结束时释放资源。 > > > >类似这样的伪代码: > >```java > >公共类 EsClientHolder { > > private static final ThreadLocal<EsClient> local = new > >InheritableThreadLocal<>(); > > > > public static final void createAndSetEsClient(EsClient esClient){ > > local.set(esClient); > > } > > > > private static final createAndSetEsClientBy(EsClientConfig > >esClientConfig){ > > EsClient instance = new EsClient(esClientConfig); > > createAndSetEsClient(instance) ; > > } > > > > private static final EsClient get() { > > EsClient c = local.get(); > > if(c == null){ > > throw new RuntimeException("确保在使用前创建并设置 EsClient 实例"); > > } > > return c; > > } > > > > private static final close()抛出 IOException { > > EsClient o = local.get(); > > if(o!= null){ > > o.close(); > > } > > } > > > >// 在 Fink 应用程序代码中的用法 > > public class main class { > > public static void main(String[] args) throws IOException { > > try { > > property prop = null; > > EsClientConfig configuration = getEsClientConfig(prop); > > EsClientHolder.createAndSetEsClientBy(config); > > // … > > SomeClass.method1(); > > other classes.method2(); > > // ... > > } at last { > > EsClientHolder.close(); > > } > > } > > } > > > >class SomeClass{ > > public void. method 1(){ > > // 1. Use EsClient in any calling method of any other class: > > EsClient esClient = EsClientHolder.get(); > > // … > > } > >} > >class other class { > > public void method 2() { > > // 2. Use EsClient in any calling method of any forked child thread > > new thread ( > > () -> { > > EsClient client = EsClientHolder.get(); > > // … > > }) > > . start(); > > // … > > } > >} > > > >``` > > > >我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。 > > > >但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。 > > > >比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass > >的线程不一样的线程, > >那么运行method1和mehod2的线程就没有办法拿到EsClient了。 > >这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() > 拆分为在不同的线程中运行,则就 > >没有办法释放资源。 > > > >谢谢! >