Hi Ken,

Changing the parallelism can affect the generation of input splits.
I had a look at BinaryInputFormat, and it adds a bunch of empty input
splits if the number of generated splits is less than the minimum number of
splits (which is equal to the parallelism).

See -->
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L133

Maybe these empty splits cause the failure.
IIRC, this was done because there was at some point (like several years
ago...) the requirement that each source task would receive a split.
I don't think this is still true. I'd try to remove these lines and see
what happens.

If that doesn't help, I'd try to add a bunch of log statements in the
InputFormat to identify the point where it fails.

Hope this helps,
Fabian


Am Do., 19. Sept. 2019 um 09:25 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Good to hear that some of your problems have been solved Ken. For the
> UTFDataFormatException it is hard to tell. Usually it says that the input
> has been produced using `writeUTF`. Cloud you maybe provide an example
> program which reproduces the problem? Moreover, it would be helpful to see
> how the input is generated and what AdText exactly does.
>
> Cheers,
> Till
>
> On Wed, Sep 18, 2019 at 9:17 PM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi Till,
>>
>> I tried out 1.9.0 with my workflow, and I no longer am running into the
>> errors I described below, which is great!
>>
>> Just to recap, this is batch, per-job mode on YARN/EMR.
>>
>> Though I did run into a new issue, related to my previous problem when
>> reading files written via SerializedOutputFormat.
>>
>> I would always get errors that look like:
>>
>> 2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading
>> from split #100 of file 's3://path-to-file/19' from 0 (state
>> 28683/156308, block size 67108864)
>> 2019-09-16 20:58:21,397 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN DataSource (at
>> makePreparedDataSet(com.company.MyWorkflow.java:67)
>> (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
>> java.io.UTFDataFormatException: malformed input around byte 51
>> at java.io.DataInputStream.readUTF(DataInputStream.java:656)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>> at com.company.AdText.read(AdText.java:170)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
>> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
>> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Which would imply (again) an issue with the read block size not being the
>> same as what was used to write it.
>>
>> But I’d run this same data through a different workflow, without any
>> issues.
>>
>> When I reduced the read parallelism of the failing workflow to match the
>> succeeding workflow (was 12, dropped it to 4), the errors went away.
>>
>> So…don’t know what’s the root issue, but I have a workaround for now.
>>
>> Though it’s a reproducible problem, which I’d like to use to help solve
>> the problem.
>>
>> Any suggestions for how to debug further?
>>
>> Thanks,
>>
>> — Ken
>>
>>
>>
>> On Jul 1, 2019, at 2:57 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Ken,
>>
>> in order to further debug your problems it would be helpful if you could
>> share the log files on DEBUG level with us.
>>
>> For problem (2), I suspect that it has been caused by Flink releasing TMs
>> too early. This should be fixed with FLINK-10941 which is part of Flink
>> 1.8.1. The 1.8.1 release should be released very soonish. It would be great
>> if you could try your program with this version or even the 1.8.1 RC to see
>> whether the problem still occurs. But it could also be caused by using fine
>> grained recovery. So it might be worth a try to disable this feature if you
>> turned it on.
>>
>> Thanks a lot!
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> Hi Ken again,
>>>
>>> In regard to TimeoutException, I just realized that there is no
>>> akka.remote.OversizedPayloadException in your log file. There might be some
>>> other reason caused this.
>>> 1. Have you ever tried increasing the configuration "akka.ask.timeout"?
>>> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need
>>> to enable printing GC log first.
>>>
>>>
>>> Biao Liu <mmyy1...@gmail.com> 于2019年6月27日周四 上午11:38写道:
>>>
>>>> Hi Ken,
>>>>
>>>> In regard to oversized input splits, it seems to be a rare case beyond
>>>> my expectation. However it should be fixed definitely since input split can
>>>> be user-defined. We should not assume it must be small.
>>>> I agree with Stephan that maybe there is something unexpectedly
>>>> involved in the input splits.
>>>> And there is also a work-around way to solve this before we fixing it,
>>>> increasing the limit of RPC message size by explicitly configuring
>>>> "akka.framesize" in flink-conf.yaml.
>>>>
>>>> To @Qi, also sorry to hear your bad experience. I'll take this issue
>>>> but I'm not sure I could catch up the releasing of 1.9. Hope things go 
>>>> well.
>>>>
>>>>
>>>> Stephan Ewen <se...@apache.org> 于2019年6月26日周三 下午10:50写道:
>>>>
>>>>> Hi Ken!
>>>>>
>>>>> Sorry to hear you are going through this experience. The major focus
>>>>> on streaming so far means that the DataSet API has stability issues at
>>>>> scale.
>>>>> So, yes, batch mode in current Flink version can be somewhat tricky.
>>>>>
>>>>> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
>>>>> addressing batch specific scheduling / recovery / and shuffle issues.
>>>>>
>>>>> Let me go through the issues you found:
>>>>>
>>>>> *(1) Input splits and oversized RPC*
>>>>>
>>>>> Your explanation seems correct, timeout due to dropping oversized RPC
>>>>> message.
>>>>>
>>>>> I don't quite understand how that exactly happens, because the size
>>>>> limit is 10 MB and input splits should be rather small in most cases.
>>>>> Are you running custom sources which put large data into splits? Maybe
>>>>> accidentally, by having a large serialized closure in the splits?
>>>>>
>>>>> The fix would be this issue:
>>>>> https://issues.apache.org/jira/browse/FLINK-4399
>>>>>
>>>>> *(2) TM early release*
>>>>>
>>>>> The 1.8 version had a fix that should work for regular cases without
>>>>> fine-grained failure recovery.
>>>>> 1.9 should have a more general fix that also works for fine-grained
>>>>> recovery
>>>>>
>>>>> Are you trying to use the finer grained failover with the batch job?
>>>>> The finer-grained failover is not working in batch for 1.8, that is
>>>>> why it is not an advertised feature (it only works for streaming so far).
>>>>>
>>>>> The goal is that this works in the 1.9 release (aka the batch fixup
>>>>> release)
>>>>>
>>>>> (3) Hang in Processing
>>>>>
>>>>> I think a thread dump (jstack) from the TMs would be helpful to
>>>>> diagnose that.
>>>>> There are known issues with the current batch shuffle implementation,
>>>>> which is why 1.9 is getting a new bounded-blocking stream shuffle
>>>>> implementation.
>>>>>
>>>>> Best,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <
>>>>> kkrugler_li...@transpac.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I’ve been running a somewhat complex batch job (in EMR/YARN) with
>>>>>> Flink 1.8.0, and it regularly fails, but for varying reasons.
>>>>>>
>>>>>> Has anyone else had stability with 1.8.0 in batch mode and
>>>>>> non-trivial workflows?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> — Ken
>>>>>>
>>>>>> *1. TimeoutException getting input splits*
>>>>>>
>>>>>> The batch job starts by processing a lot of files that live in S3.
>>>>>> During this phase, I sometimes see:
>>>>>>
>>>>>> 2019-06-20 01:20:22,659 INFO
>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN
>>>>>> DataSource (at createInput(ExecutionEnvironment.java:549)
>>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map 
>>>>>> (ad
>>>>>> dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> 
>>>>>> Filter
>>>>>> (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at
>>>>>> createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine
>>>>>> (Reduce at createWorkflow(MyWorkflow.java:41)) (31/32)
>>>>>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
>>>>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>>>>> Requesting the next input split failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>>>>>> ... 3 more
>>>>>> Caused by: java.util.concurrent.TimeoutException
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>>> at
>>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>>>> ... 4 more
>>>>>> 2019-06-20 01:20:22,664 INFO
>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
>>>>>> Flink
>>>>>> Java Job at Thu Jun 20 01:11:28 UTC 2019 
>>>>>> (5564b8980f40d788d7ef312318709e4d)
>>>>>> switched from state RUNNING to FAILING.
>>>>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by:
>>>>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>>>>> Requesting the next input split failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>>>>>> ... 3 more
>>>>>> Caused by: java.util.concurrent.TimeoutException
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>>> at
>>>>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>>>> ... 4 more
>>>>>>
>>>>>> I saw bjb...@gmail.com’s email recently about a similar issue:
>>>>>>
>>>>>> I figured this out myself. In my yarn container logs I saw this
>>>>>> warning/error,
>>>>>>
>>>>>> akka.remote.OversizedPayloadException: Discarding oversized payload
>>>>>> sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size
>>>>>> 10485760 bytes, actual size of encoded class
>>>>>> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 
>>>>>> bytes.
>>>>>>
>>>>>> Looking into this there is a max frame size for Akka which in flink
>>>>>> can be set with akka.framesize and is set to 10MB by default. Increasing
>>>>>> this past the size of my side input fixed the issue. I'm guessing this is
>>>>>> due to creating the side input PCollection from memory using the 
>>>>>> Create.of
>>>>>> APIs.
>>>>>>
>>>>>>
>>>>>> But no such akka.remote.OversizedPayloadException appears in any of
>>>>>> my log files.
>>>>>>
>>>>>> *2. TM released too soon?*
>>>>>>
>>>>>> Sometimes it fails with "Connecting the channel failed: Connecting to
>>>>>> remote task manager xxx has failed. This might indicate that the remote
>>>>>> task manager has been lost”
>>>>>>
>>>>>> I’d run into this previously with 1.7.2, but thought that 1.8.0 had
>>>>>> the fix for https://issues.apache.org/jira/browse/FLINK-10941, and
>>>>>> thus I’d avoid the problem, but it seems like there’s still an issue.
>>>>>>
>>>>>> I’m running 3 TMs on three servers, each with 32 slots. When the job
>>>>>> fails, the servers are under heavy CPU load.
>>>>>>
>>>>>> From the logs, I see the JobManager releasing two of the TMs, then
>>>>>> requesting two new containers. One of these requests gets filled, and 
>>>>>> that
>>>>>> new TM starts getting tasks for its slots.
>>>>>>
>>>>>> But then soon afterwards that new TM and the one original TM still
>>>>>> left around start failing because they aren’t getting data from (I think)
>>>>>> the other TM that was released.
>>>>>>
>>>>>> Any thoughts on what’s going wrong? Is the bug not actually fully
>>>>>> fixed? Or is there some TM timeout value that I should bump?
>>>>>>
>>>>>> In the job manager log file I see where the two TMs are getting
>>>>>> released...
>>>>>>
>>>>>> 2019-05-17 17:42:50,215 INFO  
>>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - 
>>>>>> Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
>>>>>> 2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Stopping container 
>>>>>> container_1558074033518_0003_01_000002.
>>>>>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Closing TaskExecutor connection 
>>>>>> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded 
>>>>>> the idle timeout.
>>>>>> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Stopping container 
>>>>>> container_1558074033518_0003_01_000004.
>>>>>> 2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Closing TaskExecutor connection 
>>>>>> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded 
>>>>>> the idle timeout.
>>>>>> 2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Discard registration from TaskExecutor 
>>>>>> container_1558074033518_0003_01_000002 at 
>>>>>> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0) 
>>>>>> because the framework did not recognize it
>>>>>> 2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Discard registration from TaskExecutor 
>>>>>> container_1558074033518_0003_01_000004 at 
>>>>>> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0)
>>>>>>  because the framework did not recognize it
>>>>>>
>>>>>>
>>>>>> And then later on the requests for the replacement TMs.
>>>>>>
>>>>>> 2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Requesting new TaskExecutor container with 
>>>>>> resources <memory:44000, vCores:32>. Number pending requests 1.
>>>>>>
>>>>>> 2019-05-17 17:45:01,662 INFO
>>>>>> org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor
>>>>>> container with resources <memory:44000, vCores:32>. Number pending 
>>>>>> requests
>>>>>> 2.
>>>>>>
>>>>>> And then one of the requests is satisfied:
>>>>>>
>>>>>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Received new container: 
>>>>>> container_1558074033518_0003_01_000006 - Remaining pending container 
>>>>>> requests: 2
>>>>>> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Removing container request 
>>>>>> Capability[<memory:44000, vCores:32>]Priority[1]. Pending container 
>>>>>> requests 1.
>>>>>> 2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Creating container launch context for TaskManagers
>>>>>> 2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager  
>>>>>>                    - Starting TaskManagers
>>>>>>
>>>>>>
>>>>>> So it seems like TMs are being allocated, but soon afterwards:
>>>>>>
>>>>>> 2019-05-17 17:45:12,907 INFO  
>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN 
>>>>>> Map (Map at createWorkflow(MyWorkflow.java:127)) -> Map (Key Extractor) 
>>>>>> (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to 
>>>>>> FAILED.
>>>>>> java.io.IOException: Connecting the channel failed: Connecting to remote 
>>>>>> task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has 
>>>>>> failed. This might indicate that the remote task manager has been lost.
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>  at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
>>>>>>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>>>  at 
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>>>  at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>> On one of the TMs that was released, I see at the end of its log:
>>>>>>
>>>>>> 2019-05-17 17:42:50,217 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot
>>>>>> TaskSlot(index:3, state:ACTIVE, resource profile:
>>>>>> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
>>>>>> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
>>>>>> networkMemoryInMB=2147483647}, allocationId:
>>>>>> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: 
>>>>>> eff57179c5c0e7d475c3b69d1a063017).
>>>>>> 2019-05-17 17:42:50,217 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove 
>>>>>> job
>>>>>> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
>>>>>> 2019-05-17 17:42:50,217 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>>>>>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>>>>>> 2019-05-17 17:42:50,222 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>>>>>> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
>>>>>> 2019-05-17 17:42:50,222 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
>>>>>> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not
>>>>>> registered.
>>>>>> 2019-05-17 17:43:38,982 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
>>>>>> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
>>>>>> 2019-05-17 17:43:38,982 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - 
>>>>>> Connecting
>>>>>> to ResourceManager
>>>>>> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000
>>>>>> 0000000).
>>>>>> 2019-05-17 17:43:38,988 INFO
>>>>>> org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED
>>>>>> SIGNAL 15: SIGTERM. Shutting down as requested.
>>>>>> 2019-05-17 17:43:38,988 INFO
>>>>>> org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
>>>>>> down BLOB cache
>>>>>> 2019-05-17 17:43:38,989 INFO
>>>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>>>> 2019-05-17 17:43:38,990 INFO
>>>>>> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>>>>>> down BLOB cache
>>>>>> 2019-05-17 17:43:38,991 INFO
>>>>>> org.apache.flink.runtime.filecache.FileCache                  - removed
>>>>>> file cache directory
>>>>>> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4
>>>>>> da1-9067-8d2e7351cb61
>>>>>> 2019-05-17 17:43:38,991 INFO
>>>>>> org.apache.flink.runtime.filecache.FileCache                  - removed
>>>>>> file cache directory
>>>>>> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-
>>>>>> 4d48-8ac9-bce29e9116ef
>>>>>> 2019-05-17 17:43:39,004 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved
>>>>>> ResourceManager address, beginning registration
>>>>>> 2019-05-17 17:43:39,004 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>>>>>> Registration at ResourceManager attempt 1 (timeout=100ms)
>>>>>> 2019-05-17 17:43:39,012 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            -
>>>>>> Registration at ResourceManager was declined: unrecognized TaskExecutor
>>>>>> 2019-05-17 17:43:39,012 INFO
>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing 
>>>>>> and
>>>>>> re-attempting registration in 30000 ms
>>>>>>
>>>>>> And in the replacement TM that was started, it fails with:
>>>>>>
>>>>>> 2019-05-17 17:45:12,048 ERROR
>>>>>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>>>>>> task code:  Map (Key Extractor) (34/96)
>>>>>> java.io.IOException: Connecting the channel failed: Connecting to
>>>>>> remote task manager + 'ip-10-47-197-146.ec2.internal/
>>>>>> 10.47.197.146:39133' has failed. This might indicate that the remote
>>>>>> task manager has been lost.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>>>>>
>>>>>> Where the TM it’s trying to connect to is the one that was released
>>>>>> and hasn’t been restarted yet.
>>>>>>
>>>>>> *3. Hang in processing*
>>>>>>
>>>>>> Sometimes it finishes the long-running (10 hour) operator, and then
>>>>>> the two downstream operators get stuck (these have a different 
>>>>>> parallelism,
>>>>>> so there’s a rebalance)
>>>>>>
>>>>>> In the most recent example of this, they processed about 20% of the
>>>>>> data emitted by the long running operator. There are no errors in any of
>>>>>> the logs. The last real activity in the jobmanager.log shows that all of
>>>>>> the downstream operators were deployed...
>>>>>>
>>>>>> 2019-06-22 14:58:36,648
>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
>>>>>> CHAIN
>>>>>> Map (Packed features) -> Map (Key Extractor)
>>>>>> (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to
>>>>>> RUNNING.
>>>>>>
>>>>>> Then nothing anywhere, until this msg starts appearing in the log
>>>>>> file every 5 seconds or so…
>>>>>>
>>>>>> 2019-06-22 22:56:11,303
>>>>>> INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         -
>>>>>> Updating with new AMRMToken
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>

Reply via email to