felixcheung commented on a change in pull request #3346: [ZEPPELIN-4066]. Introduce ProcessLauncher to encapsulate process launch URL: https://github.com/apache/zeppelin/pull/3346#discussion_r273780922
########## File path: zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ########## @@ -235,52 +186,64 @@ public boolean isUserImpersonated() { } public boolean isRunning() { - return running.get(); + return interpreterProcessLauncher != null && interpreterProcessLauncher.isRunning(); } - private static class ProcessLogOutputStream extends LogOutputStream { - - private Logger logger; - OutputStream out; + @Override + public String getErrorMessage() { + return this.interpreterProcessLauncher != null ? this.interpreterProcessLauncher.getErrorMessage() : ""; + } - public ProcessLogOutputStream(Logger logger) { - this.logger = logger; - } + private class InterpreterProcessLauncher extends ProcessLauncher { - @Override - protected void processLine(String s, int i) { - this.logger.debug(s); + public InterpreterProcessLauncher(CommandLine commandLine, + Map<String, String> envs) { + super(commandLine, envs); } @Override - public void write(byte [] b) throws IOException { - super.write(b); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b); + public void waitForReady(int timeout) { + synchronized (this) { + if (state != State.RUNNING) { + try { + wait(timeout); + } catch (InterruptedException e) { + LOGGER.error("Remote interpreter is not accessible"); } } } + this.stopCatchLaunchOutput(); + if (state == State.LAUNCHED) { + onTimeout(); + } } @Override - public void write(byte [] b, int offset, int len) throws IOException { - super.write(b, offset, len); + public void onProcessRunning() { + super.onProcessRunning(); + synchronized(this) { + notify(); + } + } - if (out != null) { + @Override + public void onProcessComplete(int exitValue) { + LOGGER.warn("Process is exited with exit value " + exitValue); + // For yarn-cluster mode, client process will exit with exit value 0 + // after submitting spark app. So don't move to EXITED state when exitValue is 0. + if (exitValue != 0) { + state = State.EXITED; Review comment: so EXITED is unexpected termination? perhaps this should be TERMINATED? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services