rmetzger commented on a change in pull request #14678: URL: https://github.com/apache/flink/pull/14678#discussion_r568567754
########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,50 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +Flink provides the pluggable failure listener interface for users to register multiple instances, which are called each Review comment: ```suggestion Flink provides a pluggable failure listener interface for users to register multiple instances, which are called each ``` ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,50 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +Flink provides the pluggable failure listener interface for users to register multiple instances, which are called each +time an exception reported at runtime. The default failure listener is only to record the failure count and emit the metric Review comment: ```suggestion time an exception is reported at runtime. The default failure listener only records the failure count and emits the metric ``` ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,50 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +Flink provides the pluggable failure listener interface for users to register multiple instances, which are called each +time an exception reported at runtime. The default failure listener is only to record the failure count and emit the metric +"numJobFailure" for the job. The purpose of these listeners is to build metrics based on the exceptions, make call to external Review comment: ```suggestion "numJobFailure" for the job. The purpose of these listeners is to build metrics based on the exceptions, make calls to external ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java ########## @@ -103,7 +105,8 @@ public SchedulerNG createInstance( ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, - JobStatusListener jobStatusListener) { + JobStatusListener jobStatusListener, + List<FailureListener> failureListenerFactory) { Review comment: list != factory ########## File path: flink-tests/src/test/java/org/apache/flink/test/plugin/jar/failurelistener/TestFailureListener.java ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin.jar.failurelistener; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.metrics.MetricGroup; + +/** Implementation of {@link FailureListener} for plugin loading test. */ +public class TestFailureListener implements FailureListener { + + @Override + public void init(JobID jobID, String jobName, MetricGroup metricGroup) {} Review comment: I don't think the init() method is really needed. All this information can be passed into the factory. ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,50 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +Flink provides the pluggable failure listener interface for users to register multiple instances, which are called each +time an exception reported at runtime. The default failure listener is only to record the failure count and emit the metric +"numJobFailure" for the job. The purpose of these listeners is to build metrics based on the exceptions, make call to external +systems or classify the exceptions otherwise. For example, it can distinguish whether it is a flink runtime error or an Review comment: ```suggestion systems or classify the exceptions otherwise. For example, it can be used to distinguish whether it is a Flink runtime error or an ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.core.failurelistener.FailureListenerFactory; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Utils for creating failure listener. */ +public class FailureListenerUtils { + + public static List<FailureListener> getFailureListerners( Review comment: Doesn't it make sense to return a `Set<FailureListener>` here already? otherwise, we might pass duplicate listeners that will be deduped later anyways. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ########## @@ -124,7 +125,8 @@ final ExecutionDeploymentTracker executionDeploymentTracker, long initializationTimestamp, final ComponentMainThreadExecutor mainThreadExecutor, - final JobStatusListener jobStatusListener) + final JobStatusListener jobStatusListener, + final List<FailureListener> failureListeners) Review comment: ```suggestion final Set<FailureListener> failureListeners) ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.core.failurelistener.FailureListenerFactory; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Utils for creating failure listener. */ +public class FailureListenerUtils { + + public static List<FailureListener> getFailureListerners( + Configuration configuration, JobManagerJobMetricGroup metricGroup) { Review comment: I think it is better to pass the MetricGroup here, instead of the JobManagerJobMetricGroup. ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,50 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +Flink provides the pluggable failure listener interface for users to register multiple instances, which are called each +time an exception reported at runtime. The default failure listener is only to record the failure count and emit the metric +"numJobFailure" for the job. The purpose of these listeners is to build metrics based on the exceptions, make call to external +systems or classify the exceptions otherwise. For example, it can distinguish whether it is a flink runtime error or an +application user logic error. With the accurate metrics, you may have better idea about the platform level metrics, Review comment: ```suggestion application user logic error. With accurate metrics, you may have a better idea about platform level metrics, ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.core.failurelistener.FailureListenerFactory; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Utils for creating failure listener. */ +public class FailureListenerUtils { + Review comment: ```suggestion public enum FailureListenerUtils { ; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ########## @@ -333,10 +340,15 @@ public void onUnknownDeploymentsOf( } private SchedulerNG createScheduler( + Configuration configuration, ExecutionDeploymentTracker executionDeploymentTracker, JobManagerJobMetricGroup jobManagerJobMetricGroup, JobStatusListener jobStatusListener) throws Exception { + + List<FailureListener> failureListeners = + FailureListenerUtils.getFailureListerners(configuration, jobManagerJobMetricGroup); Review comment: I think it would be better to initialize the failure listeners in the JobMaster constructor, and pass the set into this method. In the JobMaster constructor, you have access to all the required fields (jobid, jobname, config etc.) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ########## @@ -333,10 +340,15 @@ public void onUnknownDeploymentsOf( } private SchedulerNG createScheduler( + Configuration configuration, ExecutionDeploymentTracker executionDeploymentTracker, JobManagerJobMetricGroup jobManagerJobMetricGroup, JobStatusListener jobStatusListener) throws Exception { + + List<FailureListener> failureListeners = Review comment: ```suggestion Set<FailureListener> failureListeners = ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.core.failurelistener.FailureListenerFactory; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Utils for creating failure listener. */ +public class FailureListenerUtils { + + public static List<FailureListener> getFailureListerners( Review comment: ```suggestion public static List<FailureListener> getFailureListeners( ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org