Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202041863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception { + private <R> Iterable<R> entries( + Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception { Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable<UK> keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable<UV> values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator<Map.Entry<UK, UV>> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator<R> implements Iterator<R> { + private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator; + private final Function<Map.Entry<UK, UV>, R> resultMapper; + private Map.Entry<UK, UV> nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs, + @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- The problem here is that after calling `next()` and then `hasNext()`, `originalIterator` might have been advanced by `hasNext()` skipping expired entries and has other current element, it means that `remove` can not be called on it consistently with wrapping iterator current element.
---