rmetzger commented on a change in pull request #14678: URL: https://github.com/apache/flink/pull/14678#discussion_r563667477
########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,49 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +For each of execution exceptions in a flink job, it will be passed to the job master. The default failure listener is only +to record the failure count and emit the metrics numJobFailure for the job. If you need an advanced classification on exceptions, Review comment: ```suggestion to record the failure count and emit the metric "numJobFailure" for the job. If you need an advanced classification on exceptions, ``` ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,49 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +For each of execution exceptions in a flink job, it will be passed to the job master. The default failure listener is only +to record the failure count and emit the metrics numJobFailure for the job. If you need an advanced classification on exceptions, +you can build a plugin to customize failure listener. For example, it can distinguish whether it is a flink runtime error or an Review comment: ```suggestion you can build a plugin to customize the failure listener. For example, it can distinguish whether it is a flink runtime error or an ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/FailureListenerUtils.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.core.failurelistener.FailureListenerFactory; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** Utils for creating failure listener. */ +public class FailureListenerUtils { + + public static List<FailureListener> createFailureListener( Review comment: ```suggestion public static List<FailureListener> getFailureListeners( ``` ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,49 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +For each of execution exceptions in a flink job, it will be passed to the job master. The default failure listener is only Review comment: ```suggestion Each execution exception in a Flink job, will be passed to the JobManager. The default failure listener is only ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java ########## @@ -98,11 +103,25 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus true); } + /** @param failureListener the failure listener to be registered */ + public void registerFailureListener(FailureListener failureListener) { + failureListeners.add(failureListener); + } + private FailureHandlingResult handleFailure( final Throwable cause, final Set<ExecutionVertexID> verticesToRestart, final boolean globalFailure) { + try { + for (FailureListener listener : failureListeners) { + listener.onFailure(cause, globalFailure); + } + } catch (Throwable e) { + return FailureHandlingResult.unrecoverable( + new JobException("Unexpected excepton in FailureListener", e), false); Review comment: ```suggestion new JobException("Unexpected exception in FailureListener", e), false); ``` ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,49 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one Review comment: This documentation page is hard to read in my opinion. It should first describe on a high level that a user can register multiple exception listeners, which are called each time an exception is reported at runtime. The purpose of these listeners is to build metrics based on the exceptions, make call to external systems or classify the exceptions otherwise. ... ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurelistener/DefaultFailureListener.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.failurelistener; + +import org.apache.flink.core.failurelistener.FailureListener; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricNames; + +/** + * Default implementation {@link org.apache.flink.core.failurelistener.FailureListener} that record Review comment: ```suggestion * Default implementation {@link org.apache.flink.core.failurelistener.FailureListener} that records ``` ########## File path: flink-core/src/main/java/org/apache/flink/core/failurelistener/FailureListener.java ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.failurelistener; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.MetricGroup; + +/** Failure listener to customize the behavior for each type of failures tracked in job manager. */ +@PublicEvolving +public interface FailureListener { + + /** + * Initialize the FailureListener with MetricGroup. + * + * @param jobName the name job whose failure will be subscribed by the listener + * @param metricGroup metrics group that the listener can add customized metrics definition. + */ + void init(String jobName, MetricGroup metricGroup); Review comment: why not also passing the JobId? ########## File path: docs/deployment/advanced/platform.md ########## @@ -0,0 +1,49 @@ +--- +title: "Customizable Features for Platform Users" +nav-title: platform +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +Flink provides a set of customizable features for users to extend from the default behavior through the plugin framework. + +## Customize Failure Listener +For each of execution exceptions in a flink job, it will be passed to the job master. The default failure listener is only +to record the failure count and emit the metrics numJobFailure for the job. If you need an advanced classification on exceptions, +you can build a plugin to customize failure listener. For example, it can distinguish whether it is a flink runtime error or an +application user logic error. With the accurate metrics, you may have better idea about the platform level metrics, for example +failures due to network, platform reliability, etc. + + +# Implement a plugin for your custom failure listener Review comment: This is this heading a level lower than one above?. Shouldn't this be a "###" heading? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org