Hi Ken, In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. I agree with Stephan that maybe there is something unexpectedly involved in the input splits. And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.
To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well. Stephan Ewen <se...@apache.org> 于2019年6月26日周三 下午10:50写道: > Hi Ken! > > Sorry to hear you are going through this experience. The major focus on > streaming so far means that the DataSet API has stability issues at scale. > So, yes, batch mode in current Flink version can be somewhat tricky. > > It is a big focus of Flink 1.9 to fix the batch mode, finally, and by > addressing batch specific scheduling / recovery / and shuffle issues. > > Let me go through the issues you found: > > *(1) Input splits and oversized RPC* > > Your explanation seems correct, timeout due to dropping oversized RPC > message. > > I don't quite understand how that exactly happens, because the size limit > is 10 MB and input splits should be rather small in most cases. > Are you running custom sources which put large data into splits? Maybe > accidentally, by having a large serialized closure in the splits? > > The fix would be this issue: > https://issues.apache.org/jira/browse/FLINK-4399 > > *(2) TM early release* > > The 1.8 version had a fix that should work for regular cases without > fine-grained failure recovery. > 1.9 should have a more general fix that also works for fine-grained > recovery > > Are you trying to use the finer grained failover with the batch job? > The finer-grained failover is not working in batch for 1.8, that is why it > is not an advertised feature (it only works for streaming so far). > > The goal is that this works in the 1.9 release (aka the batch fixup > release) > > (3) Hang in Processing > > I think a thread dump (jstack) from the TMs would be helpful to diagnose > that. > There are known issues with the current batch shuffle implementation, > which is why 1.9 is getting a new bounded-blocking stream shuffle > implementation. > > Best, > Stephan > > > > > > > On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi all, >> >> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink >> 1.8.0, and it regularly fails, but for varying reasons. >> >> Has anyone else had stability with 1.8.0 in batch mode and non-trivial >> workflows? >> >> Thanks, >> >> — Ken >> >> *1. TimeoutException getting input splits* >> >> The batch job starts by processing a lot of files that live in S3. During >> this phase, I sometimes see: >> >> 2019-06-20 01:20:22,659 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >> DataSource (at createInput(ExecutionEnvironment.java:549) >> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad >> dailies) -> Filter (Filter at >> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at >> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at >> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key >> Extractor) -> Combine (Reduce at >> createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) >> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED. >> java.lang.RuntimeException: Could not retrieve next input split. >> at >> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >> Requesting the next input split failed. >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >> at >> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) >> ... 3 more >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >> ... 4 more >> 2019-06-20 01:20:22,664 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink >> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) >> switched from state RUNNING to FAILING. >> java.lang.RuntimeException: Could not retrieve next input split. >> at >> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >> Requesting the next input split failed. >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >> at >> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) >> ... 3 more >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >> ... 4 more >> >> I saw bjb...@gmail.com’s email recently about a similar issue: >> >> I figured this out myself. In my yarn container logs I saw this >> warning/error, >> >> akka.remote.OversizedPayloadException: Discarding oversized payload sent >> to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 >> bytes, actual size of encoded class >> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes. >> >> Looking into this there is a max frame size for Akka which in flink can >> be set with akka.framesize and is set to 10MB by default. Increasing this >> past the size of my side input fixed the issue. I'm guessing this is due to >> creating the side input PCollection from memory using the Create.of APIs. >> >> >> But no such akka.remote.OversizedPayloadException appears in any of my >> log files. >> >> *2. TM released too soon?* >> >> Sometimes it fails with "Connecting the channel failed: Connecting to >> remote task manager xxx has failed. This might indicate that the remote >> task manager has been lost” >> >> I’d run into this previously with 1.7.2, but thought that 1.8.0 had the >> fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d >> avoid the problem, but it seems like there’s still an issue. >> >> I’m running 3 TMs on three servers, each with 32 slots. When the job >> fails, the servers are under heavy CPU load. >> >> From the logs, I see the JobManager releasing two of the TMs, then >> requesting two new containers. One of these requests gets filled, and that >> new TM starts getting tasks for its slots. >> >> But then soon afterwards that new TM and the one original TM still left >> around start failing because they aren’t getting data from (I think) the >> other TM that was released. >> >> Any thoughts on what’s going wrong? Is the bug not actually fully fixed? >> Or is there some TM timeout value that I should bump? >> >> In the job manager log file I see where the two TMs are getting >> released... >> >> 2019-05-17 17:42:50,215 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing >> idle slot [d947cd800b0ef2671259c7b048c3f7fc]. >> 2019-05-17 17:43:38,942 INFO org.apache.flink.yarn.YarnResourceManager >> - Stopping container container_1558074033518_0003_01_000002. >> 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager >> - Closing TaskExecutor connection >> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the >> idle timeout. >> 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager >> - Stopping container container_1558074033518_0003_01_000004. >> 2019-05-17 17:43:38,998 INFO org.apache.flink.yarn.YarnResourceManager >> - Closing TaskExecutor connection >> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the >> idle timeout. >> 2019-05-17 17:43:39,005 WARN org.apache.flink.yarn.YarnResourceManager >> - Discard registration from TaskExecutor >> container_1558074033518_0003_01_000002 at >> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0) >> because the framework did not recognize it >> 2019-05-17 17:43:39,006 WARN org.apache.flink.yarn.YarnResourceManager >> - Discard registration from TaskExecutor >> container_1558074033518_0003_01_000004 at >> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0) >> because the framework did not recognize it >> >> >> And then later on the requests for the replacement TMs. >> >> 2019-05-17 17:45:01,655 INFO org.apache.flink.yarn.YarnResourceManager >> - Requesting new TaskExecutor container with resources >> <memory:44000, vCores:32>. Number pending requests 1. >> >> 2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - >> Requesting new TaskExecutor container with resources <memory:44000, >> vCores:32>. Number pending requests 2. >> >> And then one of the requests is satisfied: >> >> 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager >> - Received new container: >> container_1558074033518_0003_01_000006 - Remaining pending container >> requests: 2 >> 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager >> - Removing container request Capability[<memory:44000, >> vCores:32>]Priority[1]. Pending container requests 1. >> 2019-05-17 17:45:04,836 INFO org.apache.flink.yarn.YarnResourceManager >> - Creating container launch context for TaskManagers >> 2019-05-17 17:45:04,837 INFO org.apache.flink.yarn.YarnResourceManager >> - Starting TaskManagers >> >> >> So it seems like TMs are being allocated, but soon afterwards: >> >> 2019-05-17 17:45:12,907 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Map >> (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key >> Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING >> to FAILED. >> java.io.IOException: Connecting the channel failed: Connecting to remote >> task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. >> This might indicate that the remote task manager has been lost. >> at >> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >> at >> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133) >> at >> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69) >> at >> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) >> at >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166) >> at >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494) >> at >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525) >> at >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> >> On one of the TMs that was released, I see at the end of its log: >> >> 2019-05-17 17:42:50,217 INFO >> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free slot >> TaskSlot(index:3, state:ACTIVE, resource profile: >> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147 >> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, >> networkMemoryInMB=2147483647}, allocationId: >> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017). >> 2019-05-17 17:42:50,217 INFO >> org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job >> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring. >> 2019-05-17 17:42:50,217 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. >> 2019-05-17 17:42:50,222 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. >> 2019-05-17 17:42:50,222 INFO >> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot >> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not >> registered. >> 2019-05-17 17:43:38,982 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb. >> 2019-05-17 17:43:38,982 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Connecting >> to ResourceManager >> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000 >> 0000000). >> 2019-05-17 17:43:38,988 INFO >> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED >> SIGNAL 15: SIGTERM. Shutting down as requested. >> 2019-05-17 17:43:38,988 INFO >> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting >> down BLOB cache >> 2019-05-17 17:43:38,989 INFO >> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - >> Shutting down TaskExecutorLocalStateStoresManager. >> 2019-05-17 17:43:38,990 INFO >> org.apache.flink.runtime.blob.TransientBlobCache - Shutting >> down BLOB cache >> 2019-05-17 17:43:38,991 INFO >> org.apache.flink.runtime.filecache.FileCache - removed >> file cache directory >> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4 >> da1-9067-8d2e7351cb61 >> 2019-05-17 17:43:38,991 INFO >> org.apache.flink.runtime.filecache.FileCache - removed >> file cache directory >> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5- >> 4d48-8ac9-bce29e9116ef >> 2019-05-17 17:43:39,004 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved >> ResourceManager address, beginning registration >> 2019-05-17 17:43:39,004 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - >> Registration at ResourceManager attempt 1 (timeout=100ms) >> 2019-05-17 17:43:39,012 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - >> Registration at ResourceManager was declined: unrecognized TaskExecutor >> 2019-05-17 17:43:39,012 INFO >> org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing and >> re-attempting registration in 30000 ms >> >> And in the replacement TM that was started, it fails with: >> >> 2019-05-17 17:45:12,048 ERROR >> org.apache.flink.runtime.operators.BatchTask - Error in >> task code: Map (Key Extractor) (34/96) >> java.io.IOException: Connecting the channel failed: Connecting to remote >> task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has >> failed. This might indicate that the remote task manager has been lost. >> at >> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >> >> Where the TM it’s trying to connect to is the one that was released and >> hasn’t been restarted yet. >> >> *3. Hang in processing* >> >> Sometimes it finishes the long-running (10 hour) operator, and then the >> two downstream operators get stuck (these have a different parallelism, so >> there’s a rebalance) >> >> In the most recent example of this, they processed about 20% of the data >> emitted by the long running operator. There are no errors in any of the >> logs. The last real activity in the jobmanager.log shows that all of the >> downstream operators were deployed... >> >> 2019-06-22 14:58:36,648 >> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >> Map (Packed features) -> Map (Key Extractor) >> (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to >> RUNNING. >> >> Then nothing anywhere, until this msg starts appearing in the log file >> every 5 seconds or so… >> >> 2019-06-22 22:56:11,303 >> INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - >> Updating with new AMRMToken >> >> >> >>