KarmaGYZ commented on a change in pull request #18360: URL: https://github.com/apache/flink/pull/18360#discussion_r801322086
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java ########## @@ -20,36 +20,87 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.apache.flink.shaded.guava30.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in - * memory. + * memory. The memory store support to keep maximum job graphs and remove the timeout ones. */ public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore { - private final Map<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos = new HashMap<>(4); + private static final Logger LOG = LoggerFactory.getLogger(MemoryExecutionGraphInfoStore.class); + + private final Cache<JobID, ExecutionGraphInfo> serializableExecutionGraphInfos; + + @Nullable private final ScheduledFuture<?> cleanupFuture; + + private final Thread shutdownHook; + + public MemoryExecutionGraphInfoStore() { + this(Time.milliseconds(0), 0, null, null); + } + + public MemoryExecutionGraphInfoStore( + Time expirationTime, + int maximumCapacity, + @Nullable ScheduledExecutor scheduledExecutor, + @Nullable Ticker ticker) { + final long expirationMills = expirationTime.toMilliseconds(); + CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder(); + if (expirationMills > 0) { + cacheBuilder.expireAfterWrite(expirationMills, TimeUnit.MILLISECONDS); + } + if (maximumCapacity > 0) { + cacheBuilder.maximumSize(maximumCapacity); + } + if (ticker != null) { + cacheBuilder.ticker(ticker); + } + + this.serializableExecutionGraphInfos = cacheBuilder.build(); + if (scheduledExecutor != null) { + this.cleanupFuture = + scheduledExecutor.scheduleWithFixedDelay( + serializableExecutionGraphInfos::cleanUp, + expirationTime.toMilliseconds(), + expirationTime.toMilliseconds(), + TimeUnit.MILLISECONDS); + } else { + this.cleanupFuture = null; + } + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); + } @Override public int size() { - return serializableExecutionGraphInfos.size(); + return (int) serializableExecutionGraphInfos.size(); Review comment: ```suggestion return Math.toIntExact(serializableExecutionGraphInfos.size()); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.util.ManualTicker; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; + +import org.apache.flink.shaded.guava30.com.google.common.base.Ticker; + +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails; +import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the {@link MemoryExecutionGraphInfoStore}. */ +public class MemoryExecutionGraphInfoStoreTest extends TestLogger { + + /** + * Tests that we can put {@link ExecutionGraphInfo} into the {@link + * MemoryExecutionGraphInfoStore} and that the graph is persisted. + */ + @Test + public void testPut() throws IOException { + assertPutJobGraphWithStatus(JobStatus.FINISHED); + } + + /** Tests that a SUSPENDED job can be persisted. */ + @Test + public void testPutSuspendedJob() throws IOException { + assertPutJobGraphWithStatus(JobStatus.SUSPENDED); + } + + /** Tests that null is returned if we request an unknown JobID. */ + @Test + public void testUnknownGet() throws IOException { + + try (final MemoryExecutionGraphInfoStore executionGraphStore = + createMemoryExecutionGraphInfoStore()) { + assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); + } + } + + /** Tests that we obtain the correct jobs overview. */ + @Test + public void testStoredJobsOverview() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ExecutionGraphInfo> executionGraphInfos = + generateTerminalExecutionGraphInfos(numberExecutionGraphs); + + final List<JobStatus> jobStatuses = + executionGraphInfos.stream() + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); + + final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + createMemoryExecutionGraphInfoStore()) { + for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { + executionGraphInfoStore.put(executionGraphInfo); + } + + assertThat( + executionGraphInfoStore.getStoredJobsOverview(), + Matchers.equalTo(expectedJobsOverview)); + } + } + + /** Tests that we obtain the correct collection of available job details. */ + @Test + public void testAvailableJobDetails() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ExecutionGraphInfo> executionGraphInfos = + generateTerminalExecutionGraphInfos(numberExecutionGraphs); + + final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphInfos); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + createMemoryExecutionGraphInfoStore()) { + for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { + executionGraphInfoStore.put(executionGraphInfo); + } + + assertThat( + executionGraphInfoStore.getAvailableJobDetails(), + Matchers.containsInAnyOrder(jobDetails.toArray())); + } + } + + /** Tests that an expired execution graph is removed from the execution graph store. */ + @Test + public void testExecutionGraphExpiration() throws Exception { + final Time expirationTime = Time.milliseconds(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final ManualTicker manualTicker = new ManualTicker(); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + new MemoryExecutionGraphInfoStore( + expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) { + + final ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setState(JobStatus.FINISHED) + .build()); + + executionGraphInfoStore.put(executionGraphInfo); + + // there should one execution graph + assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1)); + + manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS); + + // this should trigger the cleanup after expiration + scheduledExecutor.triggerScheduledTasks(); + + assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); + + assertThat( + executionGraphInfoStore.get(executionGraphInfo.getJobId()), + Matchers.nullValue()); + + // check that the persisted file has been deleted Review comment: ```suggestion // check that the store is empty ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java ########## @@ -0,0 +1,235 @@ +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Ticker; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * Test utils class for {@link FileExecutionGraphInfoStore} and {@link + * MemoryExecutionGraphInfoStore}. + */ +public class ExecutionGraphInfoStoreTestUtils { Review comment: I like the idea of extracting the shared utilities for memory and file store tests. I think this change should belong to a separate hotfix commit. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.util.ManualTicker; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; + +import org.apache.flink.shaded.guava30.com.google.common.base.Ticker; + +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails; +import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the {@link MemoryExecutionGraphInfoStore}. */ +public class MemoryExecutionGraphInfoStoreTest extends TestLogger { + + /** + * Tests that we can put {@link ExecutionGraphInfo} into the {@link + * MemoryExecutionGraphInfoStore} and that the graph is persisted. + */ + @Test + public void testPut() throws IOException { + assertPutJobGraphWithStatus(JobStatus.FINISHED); + } + + /** Tests that a SUSPENDED job can be persisted. */ + @Test + public void testPutSuspendedJob() throws IOException { + assertPutJobGraphWithStatus(JobStatus.SUSPENDED); + } + + /** Tests that null is returned if we request an unknown JobID. */ + @Test + public void testUnknownGet() throws IOException { + + try (final MemoryExecutionGraphInfoStore executionGraphStore = + createMemoryExecutionGraphInfoStore()) { + assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); + } + } + + /** Tests that we obtain the correct jobs overview. */ + @Test + public void testStoredJobsOverview() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ExecutionGraphInfo> executionGraphInfos = + generateTerminalExecutionGraphInfos(numberExecutionGraphs); + + final List<JobStatus> jobStatuses = + executionGraphInfos.stream() + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); + + final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + createMemoryExecutionGraphInfoStore()) { + for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { + executionGraphInfoStore.put(executionGraphInfo); + } + + assertThat( + executionGraphInfoStore.getStoredJobsOverview(), + Matchers.equalTo(expectedJobsOverview)); + } + } + + /** Tests that we obtain the correct collection of available job details. */ + @Test + public void testAvailableJobDetails() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ExecutionGraphInfo> executionGraphInfos = + generateTerminalExecutionGraphInfos(numberExecutionGraphs); + + final Collection<JobDetails> jobDetails = generateJobDetails(executionGraphInfos); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + createMemoryExecutionGraphInfoStore()) { + for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { + executionGraphInfoStore.put(executionGraphInfo); + } + + assertThat( + executionGraphInfoStore.getAvailableJobDetails(), + Matchers.containsInAnyOrder(jobDetails.toArray())); + } + } + + /** Tests that an expired execution graph is removed from the execution graph store. */ + @Test + public void testExecutionGraphExpiration() throws Exception { + final Time expirationTime = Time.milliseconds(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final ManualTicker manualTicker = new ManualTicker(); + + try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = + new MemoryExecutionGraphInfoStore( + expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) { + + final ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setState(JobStatus.FINISHED) + .build()); + + executionGraphInfoStore.put(executionGraphInfo); + + // there should one execution graph + assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1)); + + manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS); + + // this should trigger the cleanup after expiration + scheduledExecutor.triggerScheduledTasks(); + + assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); + + assertThat( + executionGraphInfoStore.get(executionGraphInfo.getJobId()), + Matchers.nullValue()); + + // check that the persisted file has been deleted + assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); + } + } + + /** Tests that all persisted files are cleaned up after closing the store. */ + @Test + public void testCloseCleansUp() throws IOException { Review comment: Better to use `try` clause here in case there is `IOException` from `Store#put`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org