Thanks for the update Ken. The input splits seem to
be org.apache.hadoop.mapred.FileSplit. Nothing too fancy pops into my eye.
Internally they use org.apache.hadoop.mapreduce.lib.input.FileSplit which
stores a Path, two long pointers and two string arrays with hosts and host
infos. I would assume that they are not exceeding the 10 MB framesize limit.

Once you see the problem happen again, it would also be helpful to save the
logs.

Cheers,
Till

On Tue, Jul 2, 2019 at 2:21 AM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Stephan,
>
> Thanks for responding, comments inline below…
>
> Regards,
>
> — Ken
>
> On Jun 26, 2019, at 7:50 AM, Stephan Ewen <se...@apache.org> wrote:
>
> Hi Ken!
>
> Sorry to hear you are going through this experience. The major focus on
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
>
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
> addressing batch specific scheduling / recovery / and shuffle issues.
>
> Let me go through the issues you found:
>
> *(1) Input splits and oversized RPC*
>
> Your explanation seems correct, timeout due to dropping oversized RPC
> message.
>
> I don't quite understand how that exactly happens, because the size limit
> is 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe
> accidentally, by having a large serialized closure in the splits?
>
>
> As per my email to Till, I don’t feel like I’m doing anything tricky,
> though I am reading Hadoop sequence files that contain Cascading
> Tuple/Tuple key/value data.
>
> The fix would be this issue:
> https://issues.apache.org/jira/browse/FLINK-4399
>
> *(2) TM early release*
>
> The 1.8 version had a fix that should work for regular cases without
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained
> recovery
>
> Are you trying to use the finer grained failover with the batch job?
>
>
> No, or at least I’m not doing anything special to enable it.
>
> Is there something I need to do to explicitly _disable_ it?
>
> The finer-grained failover is not working in batch for 1.8, that is why it
> is not an advertised feature (it only works for streaming so far).
>
> The goal is that this works in the 1.9 release (aka the batch fixup
> release)
>
> (3) Hang in Processing
>
> I think a thread dump (jstack) from the TMs would be helpful to diagnose
> that.
> There are known issues with the current batch shuffle implementation,
> which is why 1.9 is getting a new bounded-blocking stream shuffle
> implementation.
>
>
> Next time it happens, I’ll dump the threads.
>
> I should have done it this time, but was in a hurry to kill the EMR
> cluster as it had been costing money all night long :(
>
>
>
> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi all,
>>
>> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink
>> 1.8.0, and it regularly fails, but for varying reasons.
>>
>> Has anyone else had stability with 1.8.0 in batch mode and non-trivial
>> workflows?
>>
>> Thanks,
>>
>> — Ken
>>
>> *1. TimeoutException getting input splits*
>>
>> The batch job starts by processing a lot of files that live in S3. During
>> this phase, I sometimes see:
>>
>> 2019-06-20 01:20:22,659 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
>> DataSource (at createInput(ExecutionEnvironment.java:549)
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad
>> dailies) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key
>> Extractor) -> Combine (Reduce at
>> createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32)
>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>> ... 3 more
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>> ... 4 more
>> 2019-06-20 01:20:22,664 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink
>> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d)
>> switched from state RUNNING to FAILING.
>> java.lang.RuntimeException: Could not retrieve next input split.
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>> ... 3 more
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>> ... 4 more
>>
>> I saw bjb...@gmail.com’s email recently about a similar issue:
>>
>> I figured this out myself. In my yarn container logs I saw this
>> warning/error,
>>
>> akka.remote.OversizedPayloadException: Discarding oversized payload sent
>> to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760
>> bytes, actual size of encoded class
>> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.
>>
>> Looking into this there is a max frame size for Akka which in flink can
>> be set with akka.framesize and is set to 10MB by default. Increasing this
>> past the size of my side input fixed the issue. I'm guessing this is due to
>> creating the side input PCollection from memory using the Create.of APIs.
>>
>>
>> But no such akka.remote.OversizedPayloadException appears in any of my
>> log files.
>>
>> *2. TM released too soon?*
>>
>> Sometimes it fails with "Connecting the channel failed: Connecting to
>> remote task manager xxx has failed. This might indicate that the remote
>> task manager has been lost”
>>
>> I’d run into this previously with 1.7.2, but thought that 1.8.0 had the
>> fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d
>> avoid the problem, but it seems like there’s still an issue.
>>
>> I’m running 3 TMs on three servers, each with 32 slots. When the job
>> fails, the servers are under heavy CPU load.
>>
>> From the logs, I see the JobManager releasing two of the TMs, then
>> requesting two new containers. One of these requests gets filled, and that
>> new TM starts getting tasks for its slots.
>>
>> But then soon afterwards that new TM and the one original TM still left
>> around start failing because they aren’t getting data from (I think) the
>> other TM that was released.
>>
>> Any thoughts on what’s going wrong? Is the bug not actually fully fixed?
>> Or is there some TM timeout value that I should bump?
>>
>> In the job manager log file I see where the two TMs are getting
>> released...
>>
>> 2019-05-17 17:42:50,215 INFO  
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing 
>> idle slot [d947cd800b0ef2671259c7b048c3f7fc].
>> 2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Stopping container container_1558074033518_0003_01_000002.
>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Closing TaskExecutor connection 
>> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the 
>> idle timeout.
>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Stopping container container_1558074033518_0003_01_000004.
>> 2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Closing TaskExecutor connection 
>> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the 
>> idle timeout.
>> 2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager      
>>                - Discard registration from TaskExecutor 
>> container_1558074033518_0003_01_000002 at 
>> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0) 
>> because the framework did not recognize it
>> 2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager      
>>                - Discard registration from TaskExecutor 
>> container_1558074033518_0003_01_000004 at 
>> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0) 
>> because the framework did not recognize it
>>
>>
>> And then later on the requests for the replacement TMs.
>>
>> 2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Requesting new TaskExecutor container with resources 
>> <memory:44000, vCores:32>. Number pending requests 1.
>>
>> 2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager -
>> Requesting new TaskExecutor container with resources <memory:44000,
>> vCores:32>. Number pending requests 2.
>>
>> And then one of the requests is satisfied:
>>
>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Received new container: 
>> container_1558074033518_0003_01_000006 - Remaining pending container 
>> requests: 2
>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Removing container request Capability[<memory:44000, 
>> vCores:32>]Priority[1]. Pending container requests 1.
>> 2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Creating container launch context for TaskManagers
>> 2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager      
>>                - Starting TaskManagers
>>
>>
>> So it seems like TMs are being allocated, but soon afterwards:
>>
>> 2019-05-17 17:45:12,907 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map 
>> (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key 
>> Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING 
>> to FAILED.
>> java.io.IOException: Connecting the channel failed: Connecting to remote 
>> task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. 
>> This might indicate that the remote task manager has been lost.
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>>      at 
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
>>      at 
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>>      at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>>      at 
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>      at 
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>      at 
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>      at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
>>      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>      at 
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>
>> On one of the TMs that was released, I see at the end of its log:
>>
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
>> TaskSlot(index:3, state:ACTIVE, resource profile:
>> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
>> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
>> networkMemoryInMB=2147483647}, allocationId:
>> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job
>> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
>> 2019-05-17 17:42:50,217 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>> 2019-05-17 17:42:50,222 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>> 2019-05-17 17:42:50,222 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
>> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not
>> registered.
>> 2019-05-17 17:43:38,982 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
>> 2019-05-17 17:43:38,982 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting
>> to ResourceManager
>> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000
>> 0000000).
>> 2019-05-17 17:43:38,988 INFO
>> org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED
>> SIGNAL 15: SIGTERM. Shutting down as requested.
>> 2019-05-17 17:43:38,988 INFO
>> org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
>> down BLOB cache
>> 2019-05-17 17:43:38,989 INFO
>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Shutting down TaskExecutorLocalStateStoresManager.
>> 2019-05-17 17:43:38,990 INFO
>> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>> down BLOB cache
>> 2019-05-17 17:43:38,991 INFO
>> org.apache.flink.runtime.filecache.FileCache                  - removed
>> file cache directory
>> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4
>> da1-9067-8d2e7351cb61
>> 2019-05-17 17:43:38,991 INFO
>> org.apache.flink.runtime.filecache.FileCache                  - removed
>> file cache directory
>> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-
>> 4d48-8ac9-bce29e9116ef
>> 2019-05-17 17:43:39,004 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved
>> ResourceManager address, beginning registration
>> 2019-05-17 17:43:39,004 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>> Registration at ResourceManager attempt 1 (timeout=100ms)
>> 2019-05-17 17:43:39,012 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>> Registration at ResourceManager was declined: unrecognized TaskExecutor
>> 2019-05-17 17:43:39,012 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and
>> re-attempting registration in 30000 ms
>>
>> And in the replacement TM that was started, it fails with:
>>
>> 2019-05-17 17:45:12,048 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  Map (Key Extractor) (34/96)
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>
>> Where the TM it’s trying to connect to is the one that was released and
>> hasn’t been restarted yet.
>>
>> *3. Hang in processing*
>>
>> Sometimes it finishes the long-running (10 hour) operator, and then the
>> two downstream operators get stuck (these have a different parallelism, so
>> there’s a rebalance)
>>
>> In the most recent example of this, they processed about 20% of the data
>> emitted by the long running operator. There are no errors in any of the
>> logs. The last real activity in the jobmanager.log shows that all of the
>> downstream operators were deployed...
>>
>> 2019-06-22 14:58:36,648
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
>> Map (Packed features) -> Map (Key Extractor)
>> (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to
>> RUNNING.
>>
>> Then nothing anywhere, until this msg starts appearing in the log file
>> every 5 seconds or so…
>>
>> 2019-06-22 22:56:11,303
>> INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         -
>> Updating with new AMRMToken
>>
>>
>>
>>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to