Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my
expectation. However it should be fixed definitely since input split can be
user-defined. We should not assume it must be small.
I agree with Stephan that maybe there is something unexpectedly involved in
the input splits.
And there is also a work-around way to solve this before we fixing it,
increasing the limit of RPC message size by explicitly configuring
"akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but
I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <se...@apache.org> 于2019年6月26日周三 下午10:50写道:

> Hi Ken!
>
> Sorry to hear you are going through this experience. The major focus on
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
>
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
> addressing batch specific scheduling / recovery / and shuffle issues.
>
> Let me go through the issues you found:
>
> *(1) Input splits and oversized RPC*
>
> Your explanation seems correct, timeout due to dropping oversized RPC
> message.
>
> I don't quite understand how that exactly happens, because the size limit
> is 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe
> accidentally, by having a large serialized closure in the splits?
>
> The fix would be this issue:
> https://issues.apache.org/jira/browse/FLINK-4399
>
> *(2) TM early release*
>
> The 1.8 version had a fix that should work for regular cases without
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained
> recovery
>
> Are you trying to use the finer grained failover with the batch job?
> The finer-grained failover is not working in batch for 1.8, that is why it
> is not an advertised feature (it only works for streaming so far).
>
> The goal is that this works in the 1.9 release (aka the batch fixup
> release)
>
> (3) Hang in Processing
>
> I think a thread dump (jstack) from the TMs would be helpful to diagnose
> that.
> There are known issues with the current batch shuffle implementation,
> which is why 1.9 is getting a new bounded-blocking stream shuffle
> implementation.
>
> Best,
> Stephan
>
>
>
>
>
>
> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi all,
>>
>> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink
>> 1.8.0, and it regularly fails, but for varying reasons.
>>
>> Has anyone else had stability with 1.8.0 in batch mode and non-trivial
>> workflows?
>>
>> Thanks,
>>
>> — Ken
>>
>> *1. TimeoutException getting input splits*
>>
>> The batch job starts by processing a lot of files that live in S3. During
>> this phase, I sometimes see:
>>
>> 2019-06-20 01:20:22,659 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
>> DataSource (at createInput(ExecutionEnvironment.java:549)
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad
>> dailies) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key
>> Extractor) -> Combine (Reduce at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32)
>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>> ... 3 more
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>> ... 4 more
>> 2019-06-20 01:20:22,664 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink
>> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d)
>> switched from state RUNNING to FAILING.
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>> ... 3 more
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>> ... 4 more
>>
>> I saw bjb...@gmail.com’s email recently about a similar issue:
>>
>> I figured this out myself. In my yarn container logs I saw this
>> warning/error,
>>
>> akka.remote.OversizedPayloadException: Discarding oversized payload sent
>> to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760
>> bytes, actual size of encoded class
>> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.
>>
>> Looking into this there is a max frame size for Akka which in flink can
>> be set with akka.framesize and is set to 10MB by default. Increasing this
>> past the size of my side input fixed the issue. I'm guessing this is due to
>> creating the side input PCollection from memory using the Create.of APIs.
>>
>>
>> But no such akka.remote.OversizedPayloadException appears in any of my
>> log files.
>>
>> *2. TM released too soon?*
>>
>> Sometimes it fails with "Connecting the channel failed: Connecting to
>> remote task manager xxx has failed. This might indicate that the remote
>> task manager has been lost”
>>
>> I’d run into this previously with 1.7.2, but thought that 1.8.0 had the
>> fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d
>> avoid the problem, but it seems like there’s still an issue.
>>
>> I’m running 3 TMs on three servers, each with 32 slots. When the job
>> fails, the servers are under heavy CPU load.
>>
>> From the logs, I see the JobManager releasing two of the TMs, then
>> requesting two new containers. One of these requests gets filled, and that
>> new TM starts getting tasks for its slots.
>>
>> But then soon afterwards that new TM and the one original TM still left
>> around start failing because they aren’t getting data from (I think) the
>> other TM that was released.
>>
>> Any thoughts on what’s going wrong? Is the bug not actually fully fixed?
>> Or is there some TM timeout value that I should bump?
>>
>> In the job manager log file I see where the two TMs are getting
>> released...
>>
>> 2019-05-17 17:42:50,215 INFO  
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing 
>> idle slot [d947cd800b0ef2671259c7b048c3f7fc].
>> 2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Stopping container container_1558074033518_0003_01_000002.
>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Closing TaskExecutor connection 
>> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the 
>> idle timeout.
>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Stopping container container_1558074033518_0003_01_000004.
>> 2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Closing TaskExecutor connection 
>> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the 
>> idle timeout.
>> 2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager      
>>                - Discard registration from TaskExecutor 
>> container_1558074033518_0003_01_000002 at 
>> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0) 
>> because the framework did not recognize it
>> 2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager      
>>                - Discard registration from TaskExecutor 
>> container_1558074033518_0003_01_000004 at 
>> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0) 
>> because the framework did not recognize it
>>
>>
>> And then later on the requests for the replacement TMs.
>>
>> 2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Requesting new TaskExecutor container with resources 
>> <memory:44000, vCores:32>. Number pending requests 1.
>>
>> 2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager -
>> Requesting new TaskExecutor container with resources <memory:44000,
>> vCores:32>. Number pending requests 2.
>>
>> And then one of the requests is satisfied:
>>
>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Received new container: 
>> container_1558074033518_0003_01_000006 - Remaining pending container 
>> requests: 2
>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Removing container request Capability[<memory:44000, 
>> vCores:32>]Priority[1]. Pending container requests 1.
>> 2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Creating container launch context for TaskManagers
>> 2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Starting TaskManagers
>>
>>
>> So it seems like TMs are being allocated, but soon afterwards:
>>
>> 2019-05-17 17:45:12,907 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map 
>> (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key 
>> Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING 
>> to FAILED.
>> java.io.IOException: Connecting the channel failed: Connecting to remote 
>> task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. 
>> This might indicate that the remote task manager has been lost.
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
>>      at 
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>>      at 
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>      at 
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>      at 
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>      at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>      at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>
>> On one of the TMs that was released, I see at the end of its log:
>>
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
>> TaskSlot(index:3, state:ACTIVE, resource profile:
>> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
>> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
>> networkMemoryInMB=2147483647}, allocationId:
>> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job
>> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>> 2019-05-17 17:42:50,222 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>> 2019-05-17 17:42:50,222 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
>> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not
>> registered.
>> 2019-05-17 17:43:38,982 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
>> 2019-05-17 17:43:38,982 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting
>> to ResourceManager
>> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000
>> 0000000).
>> 2019-05-17 17:43:38,988 INFO
>> org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED
>> SIGNAL 15: SIGTERM. Shutting down as requested.
>> 2019-05-17 17:43:38,988 INFO
>> org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
>> down BLOB cache
>> 2019-05-17 17:43:38,989 INFO
>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Shutting down TaskExecutorLocalStateStoresManager.
>> 2019-05-17 17:43:38,990 INFO
>> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>> down BLOB cache
>> 2019-05-17 17:43:38,991 INFO
>> org.apache.flink.runtime.filecache.FileCache                  - removed
>> file cache directory
>> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4
>> da1-9067-8d2e7351cb61
>> 2019-05-17 17:43:38,991 INFO
>> org.apache.flink.runtime.filecache.FileCache                  - removed
>> file cache directory
>> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-
>> 4d48-8ac9-bce29e9116ef
>> 2019-05-17 17:43:39,004 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved
>> ResourceManager address, beginning registration
>> 2019-05-17 17:43:39,004 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>> Registration at ResourceManager attempt 1 (timeout=100ms)
>> 2019-05-17 17:43:39,012 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>> Registration at ResourceManager was declined: unrecognized TaskExecutor
>> 2019-05-17 17:43:39,012 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and
>> re-attempting registration in 30000 ms
>>
>> And in the replacement TM that was started, it fails with:
>>
>> 2019-05-17 17:45:12,048 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  Map (Key Extractor) (34/96)
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>
>> Where the TM it’s trying to connect to is the one that was released and
>> hasn’t been restarted yet.
>>
>> *3. Hang in processing*
>>
>> Sometimes it finishes the long-running (10 hour) operator, and then the
>> two downstream operators get stuck (these have a different parallelism, so
>> there’s a rebalance)
>>
>> In the most recent example of this, they processed about 20% of the data
>> emitted by the long running operator. There are no errors in any of the
>> logs. The last real activity in the jobmanager.log shows that all of the
>> downstream operators were deployed...
>>
>> 2019-06-22 14:58:36,648
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
>> Map (Packed features) -> Map (Key Extractor)
>> (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to
>> RUNNING.
>>
>> Then nothing anywhere, until this msg starts appearing in the log file
>> every 5 seconds or so…
>>
>> 2019-06-22 22:56:11,303
>> INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         -
>> Updating with new AMRMToken
>>
>>
>>
>>

Reply via email to