rmetzger commented on a change in pull request #14678:
URL: https://github.com/apache/flink/pull/14678#discussion_r568567754



##########
File path: docs/deployment/advanced/platform.md
##########
@@ -0,0 +1,50 @@
+---
+title: "Customizable Features for Platform Users"
+nav-title: platform
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Flink provides a set of customizable features for users to extend from the 
default behavior through the plugin framework.
+
+## Customize Failure Listener
+Flink provides the pluggable failure listener interface for users to register 
multiple instances, which are called each 

Review comment:
       ```suggestion
   Flink provides a pluggable failure listener interface for users to register 
multiple instances, which are called each 
   ```

##########
File path: docs/deployment/advanced/platform.md
##########
@@ -0,0 +1,50 @@
+---
+title: "Customizable Features for Platform Users"
+nav-title: platform
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Flink provides a set of customizable features for users to extend from the 
default behavior through the plugin framework.
+
+## Customize Failure Listener
+Flink provides the pluggable failure listener interface for users to register 
multiple instances, which are called each 
+time an exception reported at runtime. The default failure listener is only to 
record the failure count and emit the metric

Review comment:
       ```suggestion
   time an exception is reported at runtime. The default failure listener only 
records the failure count and emits the metric
   ```

##########
File path: docs/deployment/advanced/platform.md
##########
@@ -0,0 +1,50 @@
+---
+title: "Customizable Features for Platform Users"
+nav-title: platform
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Flink provides a set of customizable features for users to extend from the 
default behavior through the plugin framework.
+
+## Customize Failure Listener
+Flink provides the pluggable failure listener interface for users to register 
multiple instances, which are called each 
+time an exception reported at runtime. The default failure listener is only to 
record the failure count and emit the metric
+"numJobFailure" for the job. The purpose of these listeners is to build 
metrics based on the exceptions, make call to external

Review comment:
       ```suggestion
   "numJobFailure" for the job. The purpose of these listeners is to build 
metrics based on the exceptions, make calls to external
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
##########
@@ -103,7 +105,8 @@ public SchedulerNG createInstance(
                 ExecutionDeploymentTracker executionDeploymentTracker,
                 long initializationTimestamp,
                 ComponentMainThreadExecutor mainThreadExecutor,
-                JobStatusListener jobStatusListener) {
+                JobStatusListener jobStatusListener,
+                List<FailureListener> failureListenerFactory) {

Review comment:
       list != factory

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/plugin/jar/failurelistener/TestFailureListener.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin.jar.failurelistener;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.failurelistener.FailureListener;
+import org.apache.flink.metrics.MetricGroup;
+
+/** Implementation of {@link FailureListener} for plugin loading test. */
+public class TestFailureListener implements FailureListener {
+
+    @Override
+    public void init(JobID jobID, String jobName, MetricGroup metricGroup) {}

Review comment:
       I don't think the init() method is really needed. All this information 
can be passed into the factory.

##########
File path: docs/deployment/advanced/platform.md
##########
@@ -0,0 +1,50 @@
+---
+title: "Customizable Features for Platform Users"
+nav-title: platform
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Flink provides a set of customizable features for users to extend from the 
default behavior through the plugin framework.
+
+## Customize Failure Listener
+Flink provides the pluggable failure listener interface for users to register 
multiple instances, which are called each 
+time an exception reported at runtime. The default failure listener is only to 
record the failure count and emit the metric
+"numJobFailure" for the job. The purpose of these listeners is to build 
metrics based on the exceptions, make call to external
+systems or classify the exceptions otherwise. For example, it can distinguish 
whether it is a flink runtime error or an 

Review comment:
       ```suggestion
   systems or classify the exceptions otherwise. For example, it can be used to 
distinguish whether it is a Flink runtime error or an 
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurelistener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failurelistener.FailureListener;
+import org.apache.flink.core.failurelistener.FailureListenerFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Utils for creating failure listener. */
+public class FailureListenerUtils {
+
+    public static List<FailureListener> getFailureListerners(

Review comment:
       Doesn't it make sense to return a `Set<FailureListener>` here already? 
otherwise, we might pass duplicate listeners  that will be deduped later 
anyways.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -124,7 +125,8 @@
             final ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
-            final JobStatusListener jobStatusListener)
+            final JobStatusListener jobStatusListener,
+            final List<FailureListener> failureListeners)

Review comment:
       ```suggestion
               final Set<FailureListener> failureListeners)
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurelistener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failurelistener.FailureListener;
+import org.apache.flink.core.failurelistener.FailureListenerFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Utils for creating failure listener. */
+public class FailureListenerUtils {
+
+    public static List<FailureListener> getFailureListerners(
+            Configuration configuration, JobManagerJobMetricGroup metricGroup) 
{

Review comment:
       I think it is better to pass the MetricGroup here, instead of the 
JobManagerJobMetricGroup.

##########
File path: docs/deployment/advanced/platform.md
##########
@@ -0,0 +1,50 @@
+---
+title: "Customizable Features for Platform Users"
+nav-title: platform
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Flink provides a set of customizable features for users to extend from the 
default behavior through the plugin framework.
+
+## Customize Failure Listener
+Flink provides the pluggable failure listener interface for users to register 
multiple instances, which are called each 
+time an exception reported at runtime. The default failure listener is only to 
record the failure count and emit the metric
+"numJobFailure" for the job. The purpose of these listeners is to build 
metrics based on the exceptions, make call to external
+systems or classify the exceptions otherwise. For example, it can distinguish 
whether it is a flink runtime error or an 
+application user logic error. With the accurate metrics, you may have better 
idea about the platform level metrics, 

Review comment:
       ```suggestion
   application user logic error. With accurate metrics, you may have a better 
idea about platform level metrics, 
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurelistener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failurelistener.FailureListener;
+import org.apache.flink.core.failurelistener.FailureListenerFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Utils for creating failure listener. */
+public class FailureListenerUtils {
+

Review comment:
       ```suggestion
   public enum FailureListenerUtils {
       ;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -333,10 +340,15 @@ public void onUnknownDeploymentsOf(
     }
 
     private SchedulerNG createScheduler(
+            Configuration configuration,
             ExecutionDeploymentTracker executionDeploymentTracker,
             JobManagerJobMetricGroup jobManagerJobMetricGroup,
             JobStatusListener jobStatusListener)
             throws Exception {
+
+        List<FailureListener> failureListeners =
+                FailureListenerUtils.getFailureListerners(configuration, 
jobManagerJobMetricGroup);

Review comment:
       I think it would be better to initialize the failure listeners in the 
JobMaster constructor, and pass the set into this method. In the JobMaster 
constructor, you have access to all the required fields (jobid, jobname, config 
etc.)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -333,10 +340,15 @@ public void onUnknownDeploymentsOf(
     }
 
     private SchedulerNG createScheduler(
+            Configuration configuration,
             ExecutionDeploymentTracker executionDeploymentTracker,
             JobManagerJobMetricGroup jobManagerJobMetricGroup,
             JobStatusListener jobStatusListener)
             throws Exception {
+
+        List<FailureListener> failureListeners =

Review comment:
       ```suggestion
           Set<FailureListener> failureListeners =
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurelistener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failurelistener.FailureListener;
+import org.apache.flink.core.failurelistener.FailureListenerFactory;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Utils for creating failure listener. */
+public class FailureListenerUtils {
+
+    public static List<FailureListener> getFailureListerners(

Review comment:
       ```suggestion
       public static List<FailureListener> getFailureListeners(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to