[ https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362241#comment-16362241 ]
ASF GitHub Bot commented on FLINK-8608: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167853480 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) throws FlinkException { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } - - terminationFuture.complete(true); } if (exception != null) { throw new FlinkException("Could not properly shut down the cluster services.", exception); } } + protected void stopClusterComponents() throws Exception { + synchronized (lock) { + Throwable exception = null; + + if (webMonitorEndpoint != null) { + webMonitorEndpoint.shutdown(Time.seconds(10L)); + } + + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (dispatcher != null) { + try { + dispatcher.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManagerRetrievalService != null) { + try { + resourceManagerRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManager != null) { + try { + resourceManager.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (archivedExecutionGraphStore != null) { + try { + archivedExecutionGraphStore.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (transientBlobCache != null) { + try { + transientBlobCache.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); + } + } + } + @Override public void onFatalError(Throwable exception) { LOG.error("Fatal error occurred in the cluster entrypoint.", exception); System.exit(RUNTIME_FAILURE_RETURN_CODE); } - protected abstract void startClusterComponents( + // -------------------------------------------------- + // Internal methods + // -------------------------------------------------- + + private void shutDownAndTerminate( + int returnCode, + ApplicationStatus applicationStatus, + @Nullable String diagnostics, --- End diff -- True at the moment it won't be used. Initially I left it in because of the Yarn based cluster entrypoints. The old implementation actively deregisters from yarn passing this as additional information. For the moment, I'll remove it. > Add MiniDispatcher for job mode > ------------------------------- > > Key: FLINK-8608 > URL: https://issues.apache.org/jira/browse/FLINK-8608 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In order to properly support the job mode, we need a {{MiniDispatcher}} which > is started with a pre initialized {{JobGraph}} and launches a single > {{JobManagerRunner}} with this job. Once the job is completed and if the > {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should > terminate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)