All jobs running in a Flink session cluster talk to the same blob server.
The time when tasks are submitted depends on the job; for streaming jobs
all tasks are deployed when the job starts running; in case of batch
jobs the submission can be staggered.
I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run (this is what
you see in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job
is quite large, but let's assume it does.
For a) there should be at most numberOfSlots * numberOfTaskExecutors
concurrent connections, in the worst case of each slot working on a
different job, as each would download the jars for their respective job.
If multiple slots are used for the same job at the same time, then the
job jar is only retrieved once.
For b) the limit should also be numberOfSlots * numberOfTaskExecutors;
it is done once per task, and there are only so many tasks that can run
at the same time.
Thus from what I can tell there should be at most 104 (26 task executors
* 2 slots * 2) concurrent attempts, of which only 54 should land in the
backlog.
Did you run into this issue before?
If not, is this application different than your existing applications?
Is the jar particularly big, jobs particularly short running or more
complex than others.
One thing to note is that the backlog relies entirely on OS
functionality, which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore
it; it appears that 128 is not an uncommon upper limit, but maybe there
are lower settings out there.
You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?
It may also be worthwhile to monitor the number of such
connections.(|netstat -ant | grep -c SYN_REC)|
@Nico Do you have any ideas?
On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
Hi Chesnay, Till, thanks for responding.
@Chesnay:
Apologies, I said cores when I meant slots JSo a total of 26 Task
managers with 2 slots each for a grand total of 52 parallelism.
@Till:
For this application, we have a grand total of 78 jobs, with some of
them demanding more parallelism than others. Each job has multiple
operators – depending on the size of the data we’re operating on, we
could submit 1 whopper with 52 parallelism, or multiple smaller jobs
submitted in parallel with a sum of 52 parallelism. When does a task
submission to a `TaskExecutor` take place? Is that on job submission
or something else? I’m just curious as a parallelism of 52 seems on
the lower side to breach 1K connections in the queue, unless
interactions with the Blobserver are much more frequent than I think.
Is it possible that separate Flink jobs share the same Blobserver?
Because we have thousands of Flink applications running concurrently
in our YARN cluster.
*// *ah**
*From:*Chesnay Schepler <ches...@apache.org>
*Sent:* Thursday, October 1, 2020 5:42 AM
*To:* Till Rohrmann <trohrm...@apache.org>; Hailu, Andreas
[Engineering] <andreas.ha...@ny.email.gs.com>
*Cc:* user@flink.apache.org
*Subject:* Re: Blobserver dying mid-application
It would also be good to know how many slots you have on each task
executor.
On 10/1/2020 11:21 AM, Till Rohrmann wrote:
Hi Andreas,
do the logs of the JM contain any information?
Theoretically, each task submission to a `TaskExecutor` can
trigger a new connection to the BlobServer. This depends a bit on
how large your TaskInformation is and whether this information is
being offloaded to the BlobServer. What you can definitely try to
do is to increase the blob.fetch.backlog in order to see whether
this solves the problem.
How many jobs and in with what timeline do you submit them to the
Flink cluster? Maybe you can share a bit more details about the
application you are running.
Cheers,
Till
On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas
<andreas.ha...@gs.com <mailto:andreas.ha...@gs.com>> wrote:
Hello folks, I’m seeing application failures where our
Blobserver is refusing connections mid application:
2020-09-30 13:56:06,227 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor -
Un-registering task and sending final execution state FINISHED
to JobManager for task DataSink (TextOutputFormat
(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
- UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
2020-09-30 13:56:06,423 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable -
Free slot TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
managedMemoryInMB=3046}, allocationId:
e8be16ec74f16c795d95b89cd08f5c37, jobId:
e808de0373bd515224434b7ec1efe249).
2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService -
Remove job e808de0373bd515224434b7ec1efe249 from job leader
monitoring.
2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.
2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService -
Cannot reconnect to job e808de0373bd515224434b7ec1efe249
because it is not registered.
2020-09-30 13:56:09,918 INFO [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 3)
2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Failed to fetch
BLOB
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
under
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
Retrying...
2020-09-30 13:56:09,920 INFO [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 4)
2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Failed to fetch
BLOB
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
under
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
Retrying...
2020-09-30 13:56:09,922 INFO [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> (retry 5)
2020-09-30 13:56:09,925 ERROR [CHAIN DataSource (dataset |
Read Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter
(Filter at validateData(DAXTask.java:97)) -> FlatMap (FlatMap
at handleBloomFilter(PreMergeTask.java:187)) -> FlatMap
(FlatMap at collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Failed to fetch
BLOB
48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473> and store it
under
/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-00000004
No retries left.
java.io.IOException: Could not connect to BlobServer at
address d43723-430.dc.gs.com/10.48.128.14:46473
<http://d43723-430.dc.gs.com/10.48.128.14:46473>
at
org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:100)
at
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
at
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
at
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
at
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
(Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at
org.apache.flink.runtime.blob.BlobClient.<init>(BlobClient.java:95)
... 8 more
Prior to the above connection refused error, I don’t see any
exceptions or failures. We’re running this application with
v1.9.2 on YARN, 26 Task Managers with 2 cores each, and with
the default BLOB server configurations. The application itself
then has many jobs it submits to the cluster. Does this sound
like a blob.fetch.backlog/concurrent-connections config
problem (defaulted to 1000 and 50 respectively)? I wasn’t sure
how chatty each TM is with the server. How can we tell if it’s
either a max concurrent-conn or backlog problem?
Best,
Andreas
------------------------------------------------------------------------
Your Personal Data: We may collect and process information
about you that may be subject to data protection laws. For
more information about how we use and disclose your personal
data, how we protect your information, our legal basis to use
your information, your rights and who you can contact, please
refer to: www.gs.com/privacy-notices
<http://www.gs.com/privacy-notices>
------------------------------------------------------------------------
Your Personal Data: We may collect and process information about you
that may be subject to data protection laws. For more information
about how we use and disclose your personal data, how we protect your
information, our legal basis to use your information, your rights and
who you can contact, please refer to: www.gs.com/privacy-notices
<http://www.gs.com/privacy-notices>