Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5059#discussion_r152823141 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java --- @@ -0,0 +1,1097 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.function.SupplierWithException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A file system that limits the number of concurrently open input streams, + * output streams, and total streams for a target file system. + * + * <p>This file system can wrap another existing file system in cases where + * the target file system cannot handle certain connection spikes and connections + * would fail in that case. This happens, for example, for very small HDFS clusters + * with few RPC handlers, when a large Flink job tries to build up many connections during + * a checkpoint. + * + * <p>The filesystem may track the progress of streams and close streams that have been + * inactive for too long, to avoid locked streams of taking up the complete pool. + * Rather than having a dedicated reaper thread, the calls that try to open a new stream + * periodically check the currently open streams once the limit of open streams is reached. + */ +@Internal +public class LimitedConnectionsFileSystem extends FileSystem { + + private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class); + + /** The original file system to which connections are limited. */ + private final FileSystem originalFs; + + /** The lock that synchronizes connection bookkeeping. */ + private final ReentrantLock lock; + + /** Condition for threads that are blocking on the availability of new connections. */ + private final Condition available; + + /** The maximum number of concurrently open output streams. */ + private final int maxNumOpenOutputStreams; + + /** The maximum number of concurrently open input streams. */ + private final int maxNumOpenInputStreams; + + /** The maximum number of concurrently open streams (input + output). */ + private final int maxNumOpenStreamsTotal; + + /** The nanoseconds that a opening a stream may wait for availability. */ + private final long streamOpenTimeoutNanos; + + /** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */ + private final long streamInactivityTimeoutNanos; + + /** The set of currently open output streams. */ + @GuardedBy("lock") + private final HashSet<OutStream> openOutputStreams; + + /** The set of currently open input streams. */ + @GuardedBy("lock") + private final HashSet<InStream> openInputStreams; + + /** The number of output streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedOutputStreams; + + /** The number of input streams reserved to be opened. */ + @GuardedBy("lock") + private int numReservedInputStreams; + + // ------------------------------------------------------------------------ + + /** + * Creates a new output connection limiting file system. + * + * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout, + * then they are terminated as "inactive", to prevent that the limited number of connections gets + * stuck on only blocked threads. + * + * @param originalFs The original file system to which connections are limited. + * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit). + */ + public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) { + this(originalFs, maxNumOpenStreamsTotal, 0, 0); + } + + /** + * Creates a new output connection limiting file system. + * + * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout, + * then they are terminated as "inactive", to prevent that the limited number of connections gets + * stuck on only blocked threads. + * + * @param originalFs The original file system to which connections are limited. + * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit). + * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when + * no more connections are currently permitted. + * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any + * bytes before it is closed as inactive. + */ + public LimitedConnectionsFileSystem( + FileSystem originalFs, + int maxNumOpenStreamsTotal, + long streamOpenTimeout, + long streamInactivityTimeout) { + this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout); + } + + /** + * Creates a new output connection limiting file system, limiting input and output streams with + * potentially different quotas. + * + * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout, + * then they are terminated as "inactive", to prevent that the limited number of connections gets + * stuck on only blocked threads. + * + * @param originalFs The original file system to which connections are limited. + * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit). + * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means no limit). + * @param maxNumOpenInputStreams The maximum number of concurrent open input streams (0 means no limit). + * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when + * no more connections are currently permitted. + * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any + * bytes before it is closed as inactive. + */ + public LimitedConnectionsFileSystem( + FileSystem originalFs, + int maxNumOpenStreamsTotal, + int maxNumOpenOutputStreams, + int maxNumOpenInputStreams, + long streamOpenTimeout, + long streamInactivityTimeout) { + + checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0"); + checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0"); + checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0"); + checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0 means infinite timeout)"); + checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >= 0 (0 means infinite timeout)"); + + this.originalFs = checkNotNull(originalFs, "originalFs"); + this.lock = new ReentrantLock(true); + this.available = lock.newCondition(); + this.openOutputStreams = new HashSet<>(); + this.openInputStreams = new HashSet<>(); + this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal; + this.maxNumOpenOutputStreams = maxNumOpenOutputStreams; + this.maxNumOpenInputStreams = maxNumOpenInputStreams; + + // assign nanos overflow aware + final long openTimeoutNanos = streamOpenTimeout * 1_000_000; + final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000; + + this.streamOpenTimeoutNanos = + openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE; + + this.streamInactivityTimeoutNanos = + inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE; + } + + // ------------------------------------------------------------------------ + + /** + * Gets the maximum number of concurrently open output streams. + */ + public int getMaxNumOpenOutputStreams() { + return maxNumOpenOutputStreams; + } + + /** + * Gets the maximum number of concurrently open input streams. + */ + public int getMaxNumOpenInputStreams() { + return maxNumOpenInputStreams; + } + + /** + * Gets the maximum number of concurrently open streams (input + output). + */ + public int getMaxNumOpenStreamsTotal() { + return maxNumOpenStreamsTotal; + } + + /** + * Gets the number of milliseconds that a opening a stream may wait for availability in the + * connection pool. + */ + public long getStreamOpenTimeout() { + return streamOpenTimeoutNanos / 1_000_000; + } + + /** + * Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive. + */ + public long getStreamInactivityTimeout() { + return streamInactivityTimeoutNanos / 1_000_000; + } + + /** + * Gets the total number of open streams (input plus output). + */ + public int getTotalNumberOfOpenStreams() { + lock.lock(); + try { + return numReservedOutputStreams + numReservedInputStreams; + } finally { + lock.unlock(); + } + } + + /** + * Gets the number of currently open output streams. + */ + public int getNumberOfOpenOutputStreams() { + lock.lock(); + try { + return numReservedOutputStreams; + } + finally { + lock.unlock(); + } + } + + /** + * Gets the number of currently open input streams. + */ + public int getNumberOfOpenInputStreams() { + return numReservedInputStreams; + } + + // ------------------------------------------------------------------------ + // input & output stream opening methods + // ------------------------------------------------------------------------ + + @Override + public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException { + return createOutputStream(() -> originalFs.create(f, overwriteMode)); + } + + @Override + @Deprecated + @SuppressWarnings("deprecation") + public FSDataOutputStream create( + Path f, + boolean overwrite, + int bufferSize, + short replication, + long blockSize) throws IOException { + + return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication, blockSize)); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return createInputStream(() -> originalFs.open(f, bufferSize)); + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + return createInputStream(() -> originalFs.open(f)); + } + + private FSDataOutputStream createOutputStream( + final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException { + + final SupplierWithException<OutStream, IOException> wrappedStreamOpener = + () -> new OutStream(streamOpener.get(), this); + + return createStream(wrappedStreamOpener, openOutputStreams, true); + } + + private FSDataInputStream createInputStream( + final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException { + + final SupplierWithException<InStream, IOException> wrappedStreamOpener = + () -> new InStream(streamOpener.get(), this); + + return createStream(wrappedStreamOpener, openInputStreams, false); + } + + // ------------------------------------------------------------------------ + // other delegating file system methods + // ------------------------------------------------------------------------ + + @Override + public FileSystemKind getKind() { + return originalFs.getKind(); + } + + @Override + public boolean isDistributedFS() { + return originalFs.isDistributedFS(); + } + + @Override + public Path getWorkingDirectory() { + return originalFs.getWorkingDirectory(); + } + + @Override + public Path getHomeDirectory() { + return originalFs.getHomeDirectory(); + } + + @Override + public URI getUri() { + return originalFs.getUri(); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return originalFs.getFileStatus(f); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + return originalFs.getFileBlockLocations(file, start, len); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return originalFs.listStatus(f); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return originalFs.delete(f, recursive); + } + + @Override + public boolean mkdirs(Path f) throws IOException { + return originalFs.mkdirs(f); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return originalFs.rename(src, dst); + } + + @Override + public boolean exists(Path f) throws IOException { + return originalFs.exists(f); + } + + @Override + @Deprecated + @SuppressWarnings("deprecation") + public long getDefaultBlockSize() { + return originalFs.getDefaultBlockSize(); + } + + // ------------------------------------------------------------------------ + + private <T extends StreamWithTimeout> T createStream( + final SupplierWithException<T, IOException> streamOpener, + final HashSet<T> openStreams, + final boolean output) throws IOException { + + final int outputLimit = output && maxNumOpenInputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE; + final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams : Integer.MAX_VALUE; + final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE; + final int outputCredit = output ? 1 : 0; + final int inputCredit = output ? 0 : 1; + + // because waiting for availability may take long, we need to be interruptible here + // and handle interrupted exceptions as I/O errors + // even though the code is written to make sure the lock is held for a short time only, + // making the lock acquisition interruptible helps to guard against the cases where + // a supposedly fast operation (like 'getPos()' on a stream) actually takes long. + try { + lock.lockInterruptibly(); + try { + // some integrity checks + assert openOutputStreams.size() <= numReservedOutputStreams; + assert openInputStreams.size() <= numReservedInputStreams; + + // wait until there are few enough streams so we can open another + waitForAvailability(totalLimit, outputLimit, inputLimit); + + // We do not open the stream here in the locked scope because opening a stream + // could take a while. Holding the lock during that operation would block all concurrent + // attempts to try and open a stream, effectively serializing all calls to open the streams. + numReservedOutputStreams += outputCredit; + numReservedInputStreams += inputCredit; + } + finally { + lock.unlock(); + } + } + catch (InterruptedException e) { + // restore interruption flag + Thread.currentThread().interrupt(); + throw new IOException("interrupted before opening stream"); + } + + // open the stream outside the lock. + boolean success = false; + try { + final T out = streamOpener.get(); + + // add the stream to the set, need to re-acquire the lock + lock.lock(); + try { + openStreams.add(out); + } finally { + lock.unlock(); + } + + // good, can now return cleanly + success = true; + return out; + } + finally { + if (!success) { + // remove the reserved credit + // we must open this non-interruptibly, because this must succeed! + lock.lock(); + try { + numReservedOutputStreams -= outputCredit; + numReservedInputStreams -= inputCredit; + available.signalAll(); + } finally { + lock.unlock(); + } + } + } + } + + @GuardedBy("lock") + private void waitForAvailability( + int totalLimit, + int outputLimit, + int inputLimit) throws InterruptedException, IOException { + + checkState(lock.isHeldByCurrentThread()); + + // compute the deadline of this operations + final long deadline; + if (streamOpenTimeoutNanos == 0) { + deadline = Long.MAX_VALUE; + } else { + long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos; + // check for overflow + deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE; + } + + // wait for available connections + long timeLeft; + + if (streamInactivityTimeoutNanos == 0) { + // simple case: just wait + while ((timeLeft = (deadline - System.nanoTime())) > 0 && + !hasAvailability(totalLimit, outputLimit, inputLimit)) { + + available.await(timeLeft, TimeUnit.NANOSECONDS); + } + } + else { + // complex case: chase down inactive streams + final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1; + + long now; + while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while still within timeout + !hasAvailability(totalLimit, outputLimit, inputLimit)) { + + // check all streams whether there in one that has been inactive for too long + if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams, now))) { + // only wait if we did not manage to close any stream. + // otherwise eagerly check again if we have availability now (we should have!) + long timeToWait = Math.min(checkIntervalNanos, timeLeft); + available.await(timeToWait, TimeUnit.NANOSECONDS); + } + } + } + + // check for timeout + // we check availability again to catch cases where the timeout expired while waiting + // to re-acquire the lock + if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit)) { + throw new IOException(String.format( + "Timeout while waiting for an available stream/connect. " + + "limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms", + maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams, + numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout())); + } + } + + @GuardedBy("lock") + private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) { + return numReservedOutputStreams < outputLimit && + numReservedInputStreams < inputLimit && + numReservedOutputStreams + numReservedInputStreams < totalLimit; + } + + @GuardedBy("lock") + private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams, long nowNanos) { + for (StreamWithTimeout stream : streams) { + try { + // If the stream is closed already, it will be removed anyways, so we + // do not classify it as inactive. We also skip the check if another check happened too recently. + if (stream.isClosed() || nowNanos < stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) { + // interval since last check not yet over + return false; + } + else if (!stream.checkNewBytesAndMark(nowNanos)) { + stream.closeDueToTimeout(); + return true; + } + } + catch (StreamTimeoutException ignored) { + // may happen due to races + } + catch (IOException e) { + // only log on debug level here, to avoid log spamming + LOG.debug("Could not check for stream progress to determine inactivity", e); + } + } + + return false; + } + + // ------------------------------------------------------------------------ + + /** + * Atomically removes the given output stream from the set of currently open output streams, + * and signals that new stream can now be opened. + */ + void unregisterOutputStream(OutStream stream) { + lock.lock(); + try { + // only decrement if we actually remove the stream + if (openOutputStreams.remove(stream)) { + numReservedOutputStreams--; + available.signalAll(); + } + } + finally { + lock.unlock(); + } + } + + /** + * Atomically removes the given input stream from the set of currently open input streams, + * and signals that new stream can now be opened. + */ + void unregisterInputStream(InStream stream) { + lock.lock(); + try { + // only decrement if we actually remove the stream + if (openInputStreams.remove(stream)) { + numReservedInputStreams--; + available.signalAll(); + } + } + finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + + /** + * A special IOException, indicating a timeout in the data output stream. + */ + public static final class StreamTimeoutException extends IOException { + + private static final long serialVersionUID = -8790922066795901928L; + + public StreamTimeoutException() { + super("Stream closed due to inactivity timeout. " + + "This is done to prevent inactive streams from blocking the full " + + "pool of limited connections"); + } + + public StreamTimeoutException(StreamTimeoutException other) { + super(other.getMessage(), other); + } + } + + // ------------------------------------------------------------------------ + + /** + * Interface for streams that can be checked for inactivity. + */ + private interface StreamWithTimeout extends Closeable { + + /** + * Gets the timestamp when the last inactivity evaluation was made. + */ + long getLastCheckTimestampNanos(); + + /** + * Checks whether there were new bytes since the last time this method was invoked. + * This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}. + * + * @return True, if there were new bytes, false if not. + */ + boolean checkNewBytesAndMark(long timestamp) throws IOException; + + /** + * Closes the stream asynchronously with a special exception that indicates closing due + * to lack of progress. + */ + void closeDueToTimeout() throws IOException; + + /** + * Checks whether the stream was closed already. + */ + boolean isClosed(); + } + + // ------------------------------------------------------------------------ + + /** + * A data output stream that wraps a given data output stream and un-registers + * from a given connection-limiting file system + * (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)} + * upon closing. + */ + private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout { + + /** The original data output stream to write to. */ + private final FSDataOutputStream originalStream; + + /** The connection-limiting file system to un-register from. */ + private final LimitedConnectionsFileSystem fs; + + /** An exception with which the stream has been externally closed. */ + private volatile StreamTimeoutException timeoutException; + + /** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)} + * method was called. It is important to initialize this with {@code -1} so that the + * first check (0 bytes) always appears to have made progress. */ + private volatile long lastCheckBytes = -1; + + /** The timestamp when the last inactivity evaluation was made. */ + private volatile long lastCheckTimestampNanos; + + /** Flag tracking whether the stream was already closed, for proper inactivity tracking. */ + private AtomicBoolean closed = new AtomicBoolean(); + + OutStream( + FSDataOutputStream originalStream, + LimitedConnectionsFileSystem fs) { + + this.originalStream = checkNotNull(originalStream); + this.fs = checkNotNull(fs); + } + + // --- FSDataOutputStream API implementation + + @Override + public void write(int b) throws IOException { + try { + originalStream.write(b); + } + catch (IOException e) { + handleIOException(e); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + try { + originalStream.write(b, off, len); + } + catch (IOException e) { + handleIOException(e); + } + } + + @Override + public long getPos() throws IOException { + try { + return originalStream.getPos(); + } + catch (IOException e) { + handleIOException(e); + return -1; // silence the compiler + } + } + + @Override + public void flush() throws IOException { + try { + originalStream.flush(); + } + catch (IOException e) { + handleIOException(e); + } + } + + @Override + public void sync() throws IOException { + try { + originalStream.sync(); + } + catch (IOException e) { + handleIOException(e); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + try { + originalStream.close(); + } + catch (IOException e) { + handleIOException(e); + } + finally { + fs.unregisterOutputStream(this); + } + } + } + + @Override + public void closeDueToTimeout() throws IOException { + this.timeoutException = new StreamTimeoutException(); + close(); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public long getLastCheckTimestampNanos() { + return lastCheckTimestampNanos; + } + + @Override + public boolean checkNewBytesAndMark(long timestamp) throws IOException { + // remember the time when checked + lastCheckTimestampNanos = timestamp; + + final long bytesNow = originalStream.getPos(); + if (bytesNow > lastCheckBytes) { + lastCheckBytes = bytesNow; + return true; + } + else { + return false; + } + } + + private void handleIOException(IOException exception) throws IOException { + if (timeoutException == null) { + throw exception; + } else { + // throw a new exception to capture this call's stack trace + throw new StreamTimeoutException(timeoutException); + } + } + } + + /** + * A data input stream that wraps a given data input stream and un-registers + * from a given connection-limiting file system + * (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)} + * upon closing. + */ + private static final class InStream extends FSDataInputStream implements StreamWithTimeout { + + /** The original data input stream to read from. */ + private final FSDataInputStream originalStream; + + /** The connection-limiting file system to un-register from. */ + private final LimitedConnectionsFileSystem fs; + + /** An exception with which the stream has been externally closed. */ + private volatile StreamTimeoutException timeoutException; + + /** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)} + * method was called. It is important to initialize this with {@code -1} so that the + * first check (0 bytes) always appears to have made progress. */ + private volatile long lastCheckBytes = -1; --- End diff -- `InStream` and `OutStream` have some overlap in the progress checking code. While they cannot have a common superclass for this functionality, logic and fields around `checkNewBytesAndMark(...)` and exception multiplexing could go to a separate class that is used in both streams to handle all the progress checks.
---