Hi Ken, Changing the parallelism can affect the generation of input splits. I had a look at BinaryInputFormat, and it adds a bunch of empty input splits if the number of generated splits is less than the minimum number of splits (which is equal to the parallelism).
See --> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L133 Maybe these empty splits cause the failure. IIRC, this was done because there was at some point (like several years ago...) the requirement that each source task would receive a split. I don't think this is still true. I'd try to remove these lines and see what happens. If that doesn't help, I'd try to add a bunch of log statements in the InputFormat to identify the point where it fails. Hope this helps, Fabian Am Do., 19. Sept. 2019 um 09:25 Uhr schrieb Till Rohrmann < trohrm...@apache.org>: > Good to hear that some of your problems have been solved Ken. For the > UTFDataFormatException it is hard to tell. Usually it says that the input > has been produced using `writeUTF`. Cloud you maybe provide an example > program which reproduces the problem? Moreover, it would be helpful to see > how the input is generated and what AdText exactly does. > > Cheers, > Till > > On Wed, Sep 18, 2019 at 9:17 PM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi Till, >> >> I tried out 1.9.0 with my workflow, and I no longer am running into the >> errors I described below, which is great! >> >> Just to recap, this is batch, per-job mode on YARN/EMR. >> >> Though I did run into a new issue, related to my previous problem when >> reading files written via SerializedOutputFormat. >> >> I would always get errors that look like: >> >> 2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow - Exception reading >> from split #100 of file 's3://path-to-file/19' from 0 (state >> 28683/156308, block size 67108864) >> 2019-09-16 20:58:21,397 ERROR >> org.apache.flink.runtime.operators.BatchTask - Error in >> task code: CHAIN DataSource (at >> makePreparedDataSet(com.company.MyWorkflow.java:67) >> (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12) >> java.io.UTFDataFormatException: malformed input around byte 51 >> at java.io.DataInputStream.readUTF(DataInputStream.java:656) >> at java.io.DataInputStream.readUTF(DataInputStream.java:564) >> at com.company.AdText.read(AdText.java:170) >> at >> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39) >> at >> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32) >> at >> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305) >> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90) >> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> >> Which would imply (again) an issue with the read block size not being the >> same as what was used to write it. >> >> But I’d run this same data through a different workflow, without any >> issues. >> >> When I reduced the read parallelism of the failing workflow to match the >> succeeding workflow (was 12, dropped it to 4), the errors went away. >> >> So…don’t know what’s the root issue, but I have a workaround for now. >> >> Though it’s a reproducible problem, which I’d like to use to help solve >> the problem. >> >> Any suggestions for how to debug further? >> >> Thanks, >> >> — Ken >> >> >> >> On Jul 1, 2019, at 2:57 AM, Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Ken, >> >> in order to further debug your problems it would be helpful if you could >> share the log files on DEBUG level with us. >> >> For problem (2), I suspect that it has been caused by Flink releasing TMs >> too early. This should be fixed with FLINK-10941 which is part of Flink >> 1.8.1. The 1.8.1 release should be released very soonish. It would be great >> if you could try your program with this version or even the 1.8.1 RC to see >> whether the problem still occurs. But it could also be caused by using fine >> grained recovery. So it might be worth a try to disable this feature if you >> turned it on. >> >> Thanks a lot! >> >> Cheers, >> Till >> >> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Ken again, >>> >>> In regard to TimeoutException, I just realized that there is no >>> akka.remote.OversizedPayloadException in your log file. There might be some >>> other reason caused this. >>> 1. Have you ever tried increasing the configuration "akka.ask.timeout"? >>> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need >>> to enable printing GC log first. >>> >>> >>> Biao Liu <mmyy1...@gmail.com> 于2019年6月27日周四 上午11:38写道: >>> >>>> Hi Ken, >>>> >>>> In regard to oversized input splits, it seems to be a rare case beyond >>>> my expectation. However it should be fixed definitely since input split can >>>> be user-defined. We should not assume it must be small. >>>> I agree with Stephan that maybe there is something unexpectedly >>>> involved in the input splits. >>>> And there is also a work-around way to solve this before we fixing it, >>>> increasing the limit of RPC message size by explicitly configuring >>>> "akka.framesize" in flink-conf.yaml. >>>> >>>> To @Qi, also sorry to hear your bad experience. I'll take this issue >>>> but I'm not sure I could catch up the releasing of 1.9. Hope things go >>>> well. >>>> >>>> >>>> Stephan Ewen <se...@apache.org> 于2019年6月26日周三 下午10:50写道: >>>> >>>>> Hi Ken! >>>>> >>>>> Sorry to hear you are going through this experience. The major focus >>>>> on streaming so far means that the DataSet API has stability issues at >>>>> scale. >>>>> So, yes, batch mode in current Flink version can be somewhat tricky. >>>>> >>>>> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by >>>>> addressing batch specific scheduling / recovery / and shuffle issues. >>>>> >>>>> Let me go through the issues you found: >>>>> >>>>> *(1) Input splits and oversized RPC* >>>>> >>>>> Your explanation seems correct, timeout due to dropping oversized RPC >>>>> message. >>>>> >>>>> I don't quite understand how that exactly happens, because the size >>>>> limit is 10 MB and input splits should be rather small in most cases. >>>>> Are you running custom sources which put large data into splits? Maybe >>>>> accidentally, by having a large serialized closure in the splits? >>>>> >>>>> The fix would be this issue: >>>>> https://issues.apache.org/jira/browse/FLINK-4399 >>>>> >>>>> *(2) TM early release* >>>>> >>>>> The 1.8 version had a fix that should work for regular cases without >>>>> fine-grained failure recovery. >>>>> 1.9 should have a more general fix that also works for fine-grained >>>>> recovery >>>>> >>>>> Are you trying to use the finer grained failover with the batch job? >>>>> The finer-grained failover is not working in batch for 1.8, that is >>>>> why it is not an advertised feature (it only works for streaming so far). >>>>> >>>>> The goal is that this works in the 1.9 release (aka the batch fixup >>>>> release) >>>>> >>>>> (3) Hang in Processing >>>>> >>>>> I think a thread dump (jstack) from the TMs would be helpful to >>>>> diagnose that. >>>>> There are known issues with the current batch shuffle implementation, >>>>> which is why 1.9 is getting a new bounded-blocking stream shuffle >>>>> implementation. >>>>> >>>>> Best, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler < >>>>> kkrugler_li...@transpac.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I’ve been running a somewhat complex batch job (in EMR/YARN) with >>>>>> Flink 1.8.0, and it regularly fails, but for varying reasons. >>>>>> >>>>>> Has anyone else had stability with 1.8.0 in batch mode and >>>>>> non-trivial workflows? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> — Ken >>>>>> >>>>>> *1. TimeoutException getting input splits* >>>>>> >>>>>> The batch job starts by processing a lot of files that live in S3. >>>>>> During this phase, I sometimes see: >>>>>> >>>>>> 2019-06-20 01:20:22,659 INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>> DataSource (at createInput(ExecutionEnvironment.java:549) >>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map >>>>>> (ad >>>>>> dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> >>>>>> Filter >>>>>> (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at >>>>>> createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine >>>>>> (Reduce at createWorkflow(MyWorkflow.java:41)) (31/32) >>>>>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED. >>>>>> java.lang.RuntimeException: Could not retrieve next input split. >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: >>>>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >>>>>> Requesting the next input split failed. >>>>>> at >>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) >>>>>> ... 3 more >>>>>> Caused by: java.util.concurrent.TimeoutException >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>>>>> at >>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >>>>>> ... 4 more >>>>>> 2019-06-20 01:20:22,664 INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>>> Flink >>>>>> Java Job at Thu Jun 20 01:11:28 UTC 2019 >>>>>> (5564b8980f40d788d7ef312318709e4d) >>>>>> switched from state RUNNING to FAILING. >>>>>> java.lang.RuntimeException: Could not retrieve next input split. >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: >>>>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >>>>>> Requesting the next input split failed. >>>>>> at >>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365) >>>>>> ... 3 more >>>>>> Caused by: java.util.concurrent.TimeoutException >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>>>>> at >>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >>>>>> ... 4 more >>>>>> >>>>>> I saw bjb...@gmail.com’s email recently about a similar issue: >>>>>> >>>>>> I figured this out myself. In my yarn container logs I saw this >>>>>> warning/error, >>>>>> >>>>>> akka.remote.OversizedPayloadException: Discarding oversized payload >>>>>> sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size >>>>>> 10485760 bytes, actual size of encoded class >>>>>> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 >>>>>> bytes. >>>>>> >>>>>> Looking into this there is a max frame size for Akka which in flink >>>>>> can be set with akka.framesize and is set to 10MB by default. Increasing >>>>>> this past the size of my side input fixed the issue. I'm guessing this is >>>>>> due to creating the side input PCollection from memory using the >>>>>> Create.of >>>>>> APIs. >>>>>> >>>>>> >>>>>> But no such akka.remote.OversizedPayloadException appears in any of >>>>>> my log files. >>>>>> >>>>>> *2. TM released too soon?* >>>>>> >>>>>> Sometimes it fails with "Connecting the channel failed: Connecting to >>>>>> remote task manager xxx has failed. This might indicate that the remote >>>>>> task manager has been lost” >>>>>> >>>>>> I’d run into this previously with 1.7.2, but thought that 1.8.0 had >>>>>> the fix for https://issues.apache.org/jira/browse/FLINK-10941, and >>>>>> thus I’d avoid the problem, but it seems like there’s still an issue. >>>>>> >>>>>> I’m running 3 TMs on three servers, each with 32 slots. When the job >>>>>> fails, the servers are under heavy CPU load. >>>>>> >>>>>> From the logs, I see the JobManager releasing two of the TMs, then >>>>>> requesting two new containers. One of these requests gets filled, and >>>>>> that >>>>>> new TM starts getting tasks for its slots. >>>>>> >>>>>> But then soon afterwards that new TM and the one original TM still >>>>>> left around start failing because they aren’t getting data from (I think) >>>>>> the other TM that was released. >>>>>> >>>>>> Any thoughts on what’s going wrong? Is the bug not actually fully >>>>>> fixed? Or is there some TM timeout value that I should bump? >>>>>> >>>>>> In the job manager log file I see where the two TMs are getting >>>>>> released... >>>>>> >>>>>> 2019-05-17 17:42:50,215 INFO >>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>> Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc]. >>>>>> 2019-05-17 17:43:38,942 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Stopping container >>>>>> container_1558074033518_0003_01_000002. >>>>>> 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Closing TaskExecutor connection >>>>>> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded >>>>>> the idle timeout. >>>>>> 2019-05-17 17:43:38,978 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Stopping container >>>>>> container_1558074033518_0003_01_000004. >>>>>> 2019-05-17 17:43:38,998 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Closing TaskExecutor connection >>>>>> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded >>>>>> the idle timeout. >>>>>> 2019-05-17 17:43:39,005 WARN org.apache.flink.yarn.YarnResourceManager >>>>>> - Discard registration from TaskExecutor >>>>>> container_1558074033518_0003_01_000002 at >>>>>> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0) >>>>>> because the framework did not recognize it >>>>>> 2019-05-17 17:43:39,006 WARN org.apache.flink.yarn.YarnResourceManager >>>>>> - Discard registration from TaskExecutor >>>>>> container_1558074033518_0003_01_000004 at >>>>>> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0) >>>>>> because the framework did not recognize it >>>>>> >>>>>> >>>>>> And then later on the requests for the replacement TMs. >>>>>> >>>>>> 2019-05-17 17:45:01,655 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Requesting new TaskExecutor container with >>>>>> resources <memory:44000, vCores:32>. Number pending requests 1. >>>>>> >>>>>> 2019-05-17 17:45:01,662 INFO >>>>>> org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor >>>>>> container with resources <memory:44000, vCores:32>. Number pending >>>>>> requests >>>>>> 2. >>>>>> >>>>>> And then one of the requests is satisfied: >>>>>> >>>>>> 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Received new container: >>>>>> container_1558074033518_0003_01_000006 - Remaining pending container >>>>>> requests: 2 >>>>>> 2019-05-17 17:45:04,360 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Removing container request >>>>>> Capability[<memory:44000, vCores:32>]Priority[1]. Pending container >>>>>> requests 1. >>>>>> 2019-05-17 17:45:04,836 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Creating container launch context for TaskManagers >>>>>> 2019-05-17 17:45:04,837 INFO org.apache.flink.yarn.YarnResourceManager >>>>>> - Starting TaskManagers >>>>>> >>>>>> >>>>>> So it seems like TMs are being allocated, but soon afterwards: >>>>>> >>>>>> 2019-05-17 17:45:12,907 INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN >>>>>> Map (Map at createWorkflow(MyWorkflow.java:127)) -> Map (Key Extractor) >>>>>> (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to >>>>>> FAILED. >>>>>> java.io.IOException: Connecting the channel failed: Connecting to remote >>>>>> task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has >>>>>> failed. This might indicate that the remote task manager has been lost. >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> >>>>>> On one of the TMs that was released, I see at the end of its log: >>>>>> >>>>>> 2019-05-17 17:42:50,217 INFO >>>>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free slot >>>>>> TaskSlot(index:3, state:ACTIVE, resource profile: >>>>>> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147 >>>>>> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, >>>>>> networkMemoryInMB=2147483647}, allocationId: >>>>>> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: >>>>>> eff57179c5c0e7d475c3b69d1a063017). >>>>>> 2019-05-17 17:42:50,217 INFO >>>>>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove >>>>>> job >>>>>> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring. >>>>>> 2019-05-17 17:42:50,217 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >>>>>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. >>>>>> 2019-05-17 17:42:50,222 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >>>>>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017. >>>>>> 2019-05-17 17:42:50,222 INFO >>>>>> org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot >>>>>> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not >>>>>> registered. >>>>>> 2019-05-17 17:43:38,982 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close >>>>>> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb. >>>>>> 2019-05-17 17:43:38,982 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>>>>> Connecting >>>>>> to ResourceManager >>>>>> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000 >>>>>> 0000000). >>>>>> 2019-05-17 17:43:38,988 INFO >>>>>> org.apache.flink.yarn.YarnTaskExecutorRunner - RECEIVED >>>>>> SIGNAL 15: SIGTERM. Shutting down as requested. >>>>>> 2019-05-17 17:43:38,988 INFO >>>>>> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting >>>>>> down BLOB cache >>>>>> 2019-05-17 17:43:38,989 INFO >>>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - >>>>>> Shutting down TaskExecutorLocalStateStoresManager. >>>>>> 2019-05-17 17:43:38,990 INFO >>>>>> org.apache.flink.runtime.blob.TransientBlobCache - Shutting >>>>>> down BLOB cache >>>>>> 2019-05-17 17:43:38,991 INFO >>>>>> org.apache.flink.runtime.filecache.FileCache - removed >>>>>> file cache directory >>>>>> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4 >>>>>> da1-9067-8d2e7351cb61 >>>>>> 2019-05-17 17:43:38,991 INFO >>>>>> org.apache.flink.runtime.filecache.FileCache - removed >>>>>> file cache directory >>>>>> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5- >>>>>> 4d48-8ac9-bce29e9116ef >>>>>> 2019-05-17 17:43:39,004 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved >>>>>> ResourceManager address, beginning registration >>>>>> 2019-05-17 17:43:39,004 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>>>>> Registration at ResourceManager attempt 1 (timeout=100ms) >>>>>> 2019-05-17 17:43:39,012 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - >>>>>> Registration at ResourceManager was declined: unrecognized TaskExecutor >>>>>> 2019-05-17 17:43:39,012 INFO >>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Pausing >>>>>> and >>>>>> re-attempting registration in 30000 ms >>>>>> >>>>>> And in the replacement TM that was started, it fails with: >>>>>> >>>>>> 2019-05-17 17:45:12,048 ERROR >>>>>> org.apache.flink.runtime.operators.BatchTask - Error in >>>>>> task code: Map (Key Extractor) (34/96) >>>>>> java.io.IOException: Connecting the channel failed: Connecting to >>>>>> remote task manager + 'ip-10-47-197-146.ec2.internal/ >>>>>> 10.47.197.146:39133' has failed. This might indicate that the remote >>>>>> task manager has been lost. >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) >>>>>> >>>>>> Where the TM it’s trying to connect to is the one that was released >>>>>> and hasn’t been restarted yet. >>>>>> >>>>>> *3. Hang in processing* >>>>>> >>>>>> Sometimes it finishes the long-running (10 hour) operator, and then >>>>>> the two downstream operators get stuck (these have a different >>>>>> parallelism, >>>>>> so there’s a rebalance) >>>>>> >>>>>> In the most recent example of this, they processed about 20% of the >>>>>> data emitted by the long running operator. There are no errors in any of >>>>>> the logs. The last real activity in the jobmanager.log shows that all of >>>>>> the downstream operators were deployed... >>>>>> >>>>>> 2019-06-22 14:58:36,648 >>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>>> CHAIN >>>>>> Map (Packed features) -> Map (Key Extractor) >>>>>> (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to >>>>>> RUNNING. >>>>>> >>>>>> Then nothing anywhere, until this msg starts appearing in the log >>>>>> file every 5 seconds or so… >>>>>> >>>>>> 2019-06-22 22:56:11,303 >>>>>> INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - >>>>>> Updating with new AMRMToken >>>>>> >>>>>> >>>>>> >>>>>> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >>