This is the second of two recovery problems I'm seeing running Flink in 
Kubernetes.  I'm posting them in separate messages for brevity and because the 
second is not directly related to the first.  Any advice is appreciated.  First 
problem: 
https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E

Setup:
Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One JobManager 
and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each TaskManager has 
16 task slots.  High availability is enabled.  S3 (s3a) for storage.  RocksDB 
with incremental snapshots.  It doesn't matter if local recover is enabled - 
I've managed to replicate with both local recovery enabled and disabled.  The 
value of "fs.s3a.connection.maximum" is 128.

Problem:
Flink + Hadoop does not either re-use existing connections to S3 or kill 
existing connections and create new ones when a job dies.

Replication Steps:
Create a job with a parallelism of 16 - all processing is occurring on TM_1.  
After a checkpoint has been taken, delete TM_1.  Job is canceled on TM_1, 
deployed and restored sucessfully on TM_2, and a new TaskManager (TM_3) is 
created and successfully registers with the JobManager.  No work is scheduled 
on TM_3.  After another checkpoint is taken, delete TM_2.  The job is canceled 
on TM_2, and attempts to be deployed TM_3 but fails with 
"org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
 Timeout waiting for connection from pool".  Flink attempts to recover by 
canceling on TM_3 and deploying on TM_4, but Flink does not does not release 
the task slots on TM_3 (TM_3 now has no free task slots).  The job is deployed 
to TM_4 which again fails with "ConnectionPoolTimeoutException: Timeout waiting 
for connection from pool".  Flink attempts to recover by canceling on TM_4, but 
does not release the task slots on TM_4 (TM_4 now has no free task slots).  As 
there are 0 available slots, the job is now caught in a SCHEDULED state.

Actual Behavior:
Shaded Hadoop does not release hold on S3 connections when job dies.

Expected Behavior:
Hadoop should be told to release connections when job dies, or should re-use 
existing connections.

Log Snip:
2019-01-10 20:03:40,191 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map 
(8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to FAILED.
java.io.InterruptedIOException: getFileStatus on 
s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd:
 org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable 
to execute HTTP request: Timeout waiting for connection from pool
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
    at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:83)
    at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
    at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to 
execute HTTP request: Timeout waiting for connection from pool
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1038)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4141)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1256)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1232)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
    ... 12 more
Caused by: 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
 Timeout waiting for connection from pool
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
    at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.conn.$Proxy10.get(Unknown
 Source)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
   at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
   at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1181)
    at 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    ... 24 more
2019-01-10 20:03:40,192 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
streamProcessorJob (83d7cb3e6d08318ef2c27878d0fe1bbd) switched from state 
RUNNING to FAILING.

Many thanks,

John Stone

Reply via email to