zentol commented on a change in pull request #15311: URL: https://github.com/apache/flink/pull/15311#discussion_r598537653
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryService.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.JobStatusProvider; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; +import org.apache.flink.runtime.util.BoundedFIFOQueue; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** {@code ExceptionHistoryService} collects failure causes. */ +public class ExceptionHistoryService { Review comment: Introducing an extra component for this is fine, but the initial introduction should be part of a separate ticket. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexProvider.java ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * {@code ExecutionVertexProvider} offers methods for retrieving {@link AccessExecutionVertex} + * instances. + * + * @param <T> The type of {@code AccessExecutionVertex}. + */ +public interface ExecutionVertexProvider<T extends AccessExecutionVertex> { + + /** + * Returns all {@link ExecutionVertex} instances that are handled by this provider. + * + * @return All {@code ExecutionVertex} instances. + */ + Iterable<T> getAllExecutionVertices(); + + /** + * Returns the {@link ExecutionVertex} for the given {@link ExecutionVertexID}. + * + * @param executionVertexId The ID of the {@code ExecutionVertex}. + * @return The {@code ExecutionVertex}. + * @throws IllegalArgumentException if the passed {@code ExecutionVertexID} is not associated to + * a {@code ExecutionVertex} handled by this provider. + */ + T getExecutionVertex(ExecutionVertexID executionVertexId); Review comment: This interface seems weird given that the history service only cares about executions. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ########## @@ -377,6 +378,28 @@ public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() { return Optional.ofNullable(checkpointStorageName); } + @Override + public ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) { + final Map<JobVertexID, ExecutionJobVertex> executionJobVertices = getAllVertices(); + Preconditions.checkArgument( + executionJobVertices.containsKey(executionVertexId.getJobVertexId()), + "The passed executionVertexId {} is invalid: There is no ExecutionJobVertex with the associated JobVertexId {}.", + executionVertexId, + executionVertexId.getJobVertexId()); + + final ExecutionJobVertex executionJobVertex = + executionJobVertices.get(executionVertexId.getJobVertexId()); Review comment: Do we really need to add another method to the EG for something that is already well supported with existing APIs? ``` AccessExecutionVertex vertex = accessExecutionGraph.getJobVertex(executionVertexId.getJobVertexId())[executionVertexId.getSubtaskIndex()]; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryFactory.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Optional; + +public interface ExceptionHistoryEntryFactory { + Optional<ExceptionHistoryEntry> create(ExecutionVertexID executionVertexID); + + Iterable<ExceptionHistoryEntry> extractAllFailures(); Review comment: This is unused in production code. It also seems like quite an odd interface TBH, and would prefer the EG to actively pass in all failure causes. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryService.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.JobStatusProvider; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; +import org.apache.flink.runtime.util.BoundedFIFOQueue; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** {@code ExceptionHistoryService} collects failure causes. */ +public class ExceptionHistoryService { + + private final Logger log; + + private final JobStatusProvider jobStatusProvider; + private final ExceptionHistoryEntryFactory exceptionHistoryEntryFactory; + + private final BoundedFIFOQueue<ExceptionHistoryEntry> entries; + + public ExceptionHistoryService( + JobStatusProvider jobStatusProvider, + ExceptionHistoryEntryFactory exceptionHistoryEntryFactory, + int maxQueueSize, + Logger log) { + this( + jobStatusProvider, + exceptionHistoryEntryFactory, + new BoundedFIFOQueue<>(maxQueueSize), + log); + } + + private ExceptionHistoryService( + JobStatusProvider jobStatusProvider, + ExceptionHistoryEntryFactory exceptionHistoryEntryFactory, + BoundedFIFOQueue<ExceptionHistoryEntry> entries, + Logger log) { + this.jobStatusProvider = jobStatusProvider; + this.exceptionHistoryEntryFactory = exceptionHistoryEntryFactory; + this.entries = entries; + this.log = log; + } + + public void registerGlobalFailure(Throwable throwable) { + registerGlobalFailure(throwable, jobStatusProvider.getStatusTimestamp(JobStatus.FAILED)); + } + + public void registerGlobalFailure(@Nullable Throwable throwable, long timestamp) { + entries.add(ExceptionHistoryEntry.fromGlobalFailure(throwable, timestamp)); + log.debug("Archive global failure.", throwable); + } + + public void registerLocalFailure(FailureHandlingResult failureHandlingResult) { Review comment: I'm not convinced that passing in an AccessExecution makes testing much more difficult. And then we could forego this whole execption entry factory business. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryService.java ########## @@ -36,38 +34,26 @@ private final Logger log; - private final JobStatusProvider jobStatusProvider; Review comment: Why not squash this into 8bd52856ac117d23df30dc9589b9739116a77a7a before opening the draft PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org