Hi Till, Thanks for following up.
I’ve got answers to other emails on this thread pending, but wanted to respond to this one now. > On Jul 1, 2019, at 7:20 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Quick addition for problem (1): The AkkaRpcActor should serialize the > response if it is a remote RPC and send an AkkaRpcException if the response's > size exceeds the maximum frame size. This should be visible on the call site > since the future should be completed with this exception. I'm wondering why > you don't see this exception. > > It could be helpful to better understand the input splits your program is > generating. Is it simply a `FileInputSplit` or did you implement a custom > InputSplitAssigner with custom InputSplits? I’m reading from about 10K files stored in S3. These are files created using Cascading, so it’s a Hadoop SequenceFile containing a (Cascading) Tuple for the key, and a Tuple for the value. Removing some logic cruft, it looks like… Job job = Job.getInstance(); job.getConfiguration().set("io.serializations", "cascading.tuple.hadoop.TupleSerialization"); FileInputFormat.addInputPath(job, new Path(inputDir)); HadoopInputFormat<Tuple, Tuple> inputFormat = HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), Tuple.class, Tuple.class, job); Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration", true); inputFormat.configure(parameters); DataSet<AdDaily> adDailies = env.createInput(inputFormat) .map(new CreateAdDaily()) .name("ad dailies"); Thanks again, — Ken > > On Mon, Jul 1, 2019 at 11:57 AM Till Rohrmann <trohrm...@apache.org > <mailto:trohrm...@apache.org>> wrote: > Hi Ken, > > in order to further debug your problems it would be helpful if you could > share the log files on DEBUG level with us. > > For problem (2), I suspect that it has been caused by Flink releasing TMs too > early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. > The 1.8.1 release should be released very soonish. It would be great if you > could try your program with this version or even the 1.8.1 RC to see whether > the problem still occurs. But it could also be caused by using fine grained > recovery. So it might be worth a try to disable this feature if you turned it > on. > > Thanks a lot! > > Cheers, > Till > > On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <mmyy1...@gmail.com > <mailto:mmyy1...@gmail.com>> wrote: > Hi Ken again, > > In regard to TimeoutException, I just realized that there is no > akka.remote.OversizedPayloadException in your log file. There might be some > other reason caused this. > 1. Have you ever tried increasing the configuration "akka.ask.timeout"? > 2. Have you ever checked the garbage collection of JM/TM? Maybe you need to > enable printing GC log first. > > > Biao Liu <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> 于2019年6月27日周四 > 上午11:38写道: > Hi Ken, > > In regard to oversized input splits, it seems to be a rare case beyond my > expectation. However it should be fixed definitely since input split can be > user-defined. We should not assume it must be small. > I agree with Stephan that maybe there is something unexpectedly involved in > the input splits. > And there is also a work-around way to solve this before we fixing it, > increasing the limit of RPC message size by explicitly configuring > "akka.framesize" in flink-conf.yaml. > > To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm > not sure I could catch up the releasing of 1.9. Hope things go well. > > > Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> 于2019年6月26日周三 > 下午10:50写道: > Hi Ken! > > Sorry to hear you are going through this experience. The major focus on > streaming so far means that the DataSet API has stability issues at scale. > So, yes, batch mode in current Flink version can be somewhat tricky. > > It is a big focus of Flink 1.9 to fix the batch mode, finally, and by > addressing batch specific scheduling / recovery / and shuffle issues. > > Let me go through the issues you found: > > (1) Input splits and oversized RPC > > Your explanation seems correct, timeout due to dropping oversized RPC message. > > I don't quite understand how that exactly happens, because the size limit is > 10 MB and input splits should be rather small in most cases. > Are you running custom sources which put large data into splits? Maybe > accidentally, by having a large serialized closure in the splits? > > The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 > <https://issues.apache.org/jira/browse/FLINK-4399> > > (2) TM early release > > The 1.8 version had a fix that should work for regular cases without > fine-grained failure recovery. > 1.9 should have a more general fix that also works for fine-grained recovery > > Are you trying to use the finer grained failover with the batch job? > The finer-grained failover is not working in batch for 1.8, that is why it is > not an advertised feature (it only works for streaming so far). > > The goal is that this works in the 1.9 release (aka the batch fixup release) > > (3) Hang in Processing > > I think a thread dump (jstack) from the TMs would be helpful to diagnose that. > There are known issues with the current batch shuffle implementation, which > is why 1.9 is getting a new bounded-blocking stream shuffle implementation. > > Best, > Stephan > > > > > > > On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi all, > > I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink > 1.8.0, and it regularly fails, but for varying reasons. > > Has anyone else had stability with 1.8.0 in batch mode and non-trivial > workflows? > > Thanks, > > — Ken > > 1. TimeoutException getting input splits > > The batch job starts by processing a lot of files that live in S3. During > this phase, I sometimes see: > > 2019-06-20 01:20:22,659 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN > DataSource (at createInput(ExecutionEnvironment.java:549) > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad > dailies) -> Filter (Filter at > createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at > createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at > createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) > -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) > (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED. > java.lang.RuntimeException: Could not retrieve next input split. > at > org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) > at > org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) > ... 3 more > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) > ... 4 more > 2019-06-20 01:20:22,664 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink > Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) > switched from state RUNNING to FAILING. > java.lang.RuntimeException: Could not retrieve next input split. > at > org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) > at > org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) > ... 3 more > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) > ... 4 more > > I saw bjb...@gmail.com <mailto:bjb...@gmail.com>’s email recently about a > similar issue: > >> I figured this out myself. In my yarn container logs I saw this >> warning/error, >> >> akka.remote.OversizedPayloadException: Discarding oversized payload sent to >> Actor[akka.tcp://flink@HOST:43911/temp/$n]: <> max allowed size 10485760 >> bytes, actual size of encoded class >> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes. >> >> Looking into this there is a max frame size for Akka which in flink can be >> set with akka.framesize and is set to 10MB by default. Increasing this past >> the size of my side input fixed the issue. I'm guessing this is due to >> creating the side input PCollection from memory using the Create.of APIs. > > But no such akka.remote.OversizedPayloadException appears in any of my log > files. > > 2. TM released too soon? > > Sometimes it fails with "Connecting the channel failed: Connecting to remote > task manager xxx has failed. This might indicate that the remote task manager > has been lost” > > I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix > for https://issues.apache.org/jira/browse/FLINK-10941 > <https://issues.apache.org/jira/browse/FLINK-10941>, and thus I’d avoid the > problem, but it seems like there’s still an issue. > > I’m running 3 TMs on three servers, each with 32 slots. When the job fails, > the servers are under heavy CPU load. > > From the logs, I see the JobManager releasing two of the TMs, then requesting > two new containers. One of these requests gets filled, and that new TM starts > getting tasks for its slots. > > But then soon afterwards that new TM and the one original TM still left > around start failing because they aren’t getting data from (I think) the > other TM that was released. > > Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or > is there some TM timeout value that I should bump? > > In the job manager log file I see where the two TMs are getting released... > > 2019-05-17 17:42:50,215 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing > idle slot [d947cd800b0ef2671259c7b048c3f7fc]. > 2019-05-17 17:43:38,942 INFO org.apache.flink.yarn.YarnResourceManager > - Stopping container container_1558074033518_0003_01_000002. > 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the > idle timeout. > 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager > - Stopping container container_1558074033518_0003_01_000004. > 2019-05-17 17:43:38,998 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the > idle timeout. > 2019-05-17 17:43:39,005 WARN org.apache.flink.yarn.YarnResourceManager > - Discard registration from TaskExecutor > container_1558074033518_0003_01_000002 at > (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0 <>) > because the framework did not recognize it > 2019-05-17 17:43:39,006 WARN org.apache.flink.yarn.YarnResourceManager > - Discard registration from TaskExecutor > container_1558074033518_0003_01_000004 at > (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0 <>) > because the framework did not recognize it > > And then later on the requests for the replacement TMs. > > 2019-05-17 17:45:01,655 INFO org.apache.flink.yarn.YarnResourceManager > - Requesting new TaskExecutor container with resources > <memory:44000, vCores:32>. Number pending requests 1. > 2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - > Requesting new TaskExecutor container with resources <memory:44000, > vCores:32>. Number pending requests 2. > > And then one of the requests is satisfied: > > 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1558074033518_0003_01_000006 - Remaining pending container > requests: 2 > 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager > - Removing container request Capability[<memory:44000, > vCores:32>]Priority[1]. Pending container requests 1. > 2019-05-17 17:45:04,836 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2019-05-17 17:45:04,837 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers > > So it seems like TMs are being allocated, but soon afterwards: > > 2019-05-17 17:45:12,907 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Map > (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key > Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING > to FAILED. > java.io.IOException: Connecting the channel failed: Connecting to remote task > manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317 > <http://10.28.81.66:40317/>' has failed. This might indicate that the remote > task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69) > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > On one of the TMs that was released, I see at the end of its log: > > 2019-05-17 17:42:50,217 INFO > org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free slot > TaskSlot(index:3, state:ACTIVE, resource profile: > ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147 > 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, > networkMemoryInMB=2147483647}, allocationId: > e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017). > 2019-05-17 17:42:50,217 INFO > org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job > eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring. > 2019-05-17 17:42:50,217 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Close > JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. > 2019-05-17 17:42:50,222 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Close > JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. > 2019-05-17 17:42:50,222 INFO > org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot > reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not > registered. > 2019-05-17 17:43:38,982 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Close > ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb. > 2019-05-17 17:43:38,982 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Connecting to > ResourceManager > akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000 > <>0000000) <>. > 2019-05-17 17:43:38,988 INFO org.apache.flink.yarn.YarnTaskExecutorRunner > - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. > 2019-05-17 17:43:38,988 INFO > org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down > BLOB cache > 2019-05-17 17:43:38,989 INFO > org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - > Shutting down TaskExecutorLocalStateStoresManager. > 2019-05-17 17:43:38,990 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > 2019-05-17 17:43:38,991 INFO org.apache.flink.runtime.filecache.FileCache > - removed file cache directory > /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61 > 2019-05-17 17:43:38,991 INFO org.apache.flink.runtime.filecache.FileCache > - removed file cache directory > /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef > 2019-05-17 17:43:39,004 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved > ResourceManager address, beginning registration > 2019-05-17 17:43:39,004 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration > at ResourceManager attempt 1 (timeout=100ms) > 2019-05-17 17:43:39,012 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration > at ResourceManager was declined: unrecognized TaskExecutor > 2019-05-17 17:43:39,012 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing and > re-attempting registration in 30000 ms > > And in the replacement TM that was started, it fails with: > > 2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask > - Error in task code: Map (Key Extractor) (34/96) > java.io.IOException: Connecting the channel failed: Connecting to remote task > manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133 > <http://10.47.197.146:39133/>' has failed. This might indicate that the > remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) > > Where the TM it’s trying to connect to is the one that was released and > hasn’t been restarted yet. > > 3. Hang in processing > > Sometimes it finishes the long-running (10 hour) operator, and then the two > downstream operators get stuck (these have a different parallelism, so > there’s a rebalance) > > In the most recent example of this, they processed about 20% of the data > emitted by the long running operator. There are no errors in any of the logs. > The last real activity in the jobmanager.log shows that all of the downstream > operators were deployed... > > 2019-06-22 14:58:36,648 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Map > (Packed features) -> Map (Key Extractor) (7/32) > (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING. > > Then nothing anywhere, until this msg starts appearing in the log file every > 5 seconds or so… > > 2019-06-22 22:56:11,303 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Updating with > new AMRMToken > > > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra