[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683735#comment-15683735
 ] 

ASF GitHub Bot commented on FLINK-5107:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2837#discussion_r88886636
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
 ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.util;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.ConcurrentModificationException;
    +import java.util.Iterator;
    +
    +/**
    + * This class implements a list (array based) that is physically bounded 
in maximum size, but can virtually grow beyond
    + * the bounded size. When the list grows beyond the size bound, elements 
are dropped from the head of the list (FIFO
    + * order). If dropped elements are accessed, a default element is returned 
instead.
    + * <p>
    + * TODO this class could eventually implement the whole actual List 
interface.
    + *
    + * @param <T> type of the list elements
    + */
    +public class EvictingBoundedList<T> implements Iterable<T> {
    +
    +   private final T defaultElement;
    +   private final Object[] elements;
    +   private int idx;
    +   private int count;
    +   private long modCount;
    +
    +   public EvictingBoundedList(int sizeLimit) {
    +           this(sizeLimit, null);
    +   }
    +
    +   public EvictingBoundedList(int sizeLimit, T defaultElement) {
    +           this.elements = new Object[sizeLimit];
    +           this.defaultElement = defaultElement;
    +           this.idx = 0;
    +           this.count = 0;
    +           this.modCount = 0;
    +   }
    +
    +   public int size() {
    +           return count;
    +   }
    +
    +   public boolean isEmpty() {
    +           return 0 == count;
    +   }
    +
    +   public boolean add(T t) {
    +           elements[idx] = t;
    +           idx = (idx + 1) % elements.length;
    +           ++count;
    +           ++modCount;
    +           return true;
    +   }
    +
    +   public void clear() {
    +           if (!isEmpty()) {
    +                   for (int i = 0; i < elements.length; ++i) {
    +                           elements[i] = null;
    +                   }
    +                   count = 0;
    +                   idx = 0;
    +                   ++modCount;
    +           }
    +   }
    +
    +   public T get(int index) {
    +           Preconditions.checkArgument(index >= 0 && index < count);
    +           return isDroppedIndex(index) ? getDefaultElement() : 
accessInternal(index % elements.length);
    +   }
    +
    +   public T set(int index, T element) {
    +           Preconditions.checkArgument(index >= 0 && index < count);
    +           ++modCount;
    +           if (isDroppedIndex(index)) {
    +                   return getDefaultElement();
    +           } else {
    +                   int idx = index % elements.length;
    +                   T old = accessInternal(idx);
    +                   elements[idx] = element;
    +                   return old;
    +           }
    +   }
    +
    +   public T getDefaultElement() {
    +           return defaultElement;
    +   }
    +
    +   private boolean isDroppedIndex(int idx) {
    +           return idx < count - elements.length;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private T accessInternal(int arrayIndex) {
    +           return (T) elements[arrayIndex];
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +           return new Iterator<T>() {
    +
    +                   int pos = 0;
    +                   final long oldModCount = modCount;
    +
    +                   @Override
    +                   public boolean hasNext() {
    +                           return pos < count;
    +                   }
    +
    +                   @Override
    +                   public T next() {
    +                           if (oldModCount != modCount) {
    +                                   throw new 
ConcurrentModificationException();
    +                           }
    +                           return get(pos++);
    --- End diff --
    
    I think we should throw an `NoSuchElementException` if we see an 
`IllegalArgumentException` here.


> Job Manager goes out of memory from long history of prior execution attempts
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-5107
>                 URL: https://issues.apache.org/jira/browse/FLINK-5107
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to