[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683735#comment-15683735 ]
ASF GitHub Bot commented on FLINK-5107: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2837#discussion_r88886636 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.Preconditions; + +import java.util.ConcurrentModificationException; +import java.util.Iterator; + +/** + * This class implements a list (array based) that is physically bounded in maximum size, but can virtually grow beyond + * the bounded size. When the list grows beyond the size bound, elements are dropped from the head of the list (FIFO + * order). If dropped elements are accessed, a default element is returned instead. + * <p> + * TODO this class could eventually implement the whole actual List interface. + * + * @param <T> type of the list elements + */ +public class EvictingBoundedList<T> implements Iterable<T> { + + private final T defaultElement; + private final Object[] elements; + private int idx; + private int count; + private long modCount; + + public EvictingBoundedList(int sizeLimit) { + this(sizeLimit, null); + } + + public EvictingBoundedList(int sizeLimit, T defaultElement) { + this.elements = new Object[sizeLimit]; + this.defaultElement = defaultElement; + this.idx = 0; + this.count = 0; + this.modCount = 0; + } + + public int size() { + return count; + } + + public boolean isEmpty() { + return 0 == count; + } + + public boolean add(T t) { + elements[idx] = t; + idx = (idx + 1) % elements.length; + ++count; + ++modCount; + return true; + } + + public void clear() { + if (!isEmpty()) { + for (int i = 0; i < elements.length; ++i) { + elements[i] = null; + } + count = 0; + idx = 0; + ++modCount; + } + } + + public T get(int index) { + Preconditions.checkArgument(index >= 0 && index < count); + return isDroppedIndex(index) ? getDefaultElement() : accessInternal(index % elements.length); + } + + public T set(int index, T element) { + Preconditions.checkArgument(index >= 0 && index < count); + ++modCount; + if (isDroppedIndex(index)) { + return getDefaultElement(); + } else { + int idx = index % elements.length; + T old = accessInternal(idx); + elements[idx] = element; + return old; + } + } + + public T getDefaultElement() { + return defaultElement; + } + + private boolean isDroppedIndex(int idx) { + return idx < count - elements.length; + } + + @SuppressWarnings("unchecked") + private T accessInternal(int arrayIndex) { + return (T) elements[arrayIndex]; + } + + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + + int pos = 0; + final long oldModCount = modCount; + + @Override + public boolean hasNext() { + return pos < count; + } + + @Override + public T next() { + if (oldModCount != modCount) { + throw new ConcurrentModificationException(); + } + return get(pos++); --- End diff -- I think we should throw an `NoSuchElementException` if we see an `IllegalArgumentException` here. > Job Manager goes out of memory from long history of prior execution attempts > ---------------------------------------------------------------------------- > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager > Reporter: Stefan Richter > Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)