gongyining created ZEPPELIN-5510: ------------------------------------ Summary: There are some logic problems, which may lead to thread leaks Key: ZEPPELIN-5510 URL: https://issues.apache.org/jira/browse/ZEPPELIN-5510 Project: Zeppelin Issue Type: Improvement Components: Interpreters Affects Versions: 0.10.0 Reporter: gongyining
{code:java} public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000")); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); if (previousJobClient != null) { LOGGER.warn("There's another Job {} that is associated with paragraph {}", jobClient.getJobID(), paragraphId); } } {code} There are some problems with this code.It may cause thread leak.I think it shoud be changed to this {code:java} public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); if (previousJobClient != null) { LOGGER.warn("There's another Job {} that is associated with paragraph {}", jobClient.getJobID(), paragraphId); return; } long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000")); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); } {code} If previousJobClient is not null.We shouldn't start threading again. -- This message was sent by Atlassian Jira (v8.3.4#803005)