Yes, the task manager continues running. I have put together a test app to demonstrate the problem and in doing so noticed some oddities. The problem manifests itself on a simple join (I originally believed it was the distinct, I was wrong).
- When the source is generated via fromCollection(), it works fine. - When the source is generated via readCsvFile() where the file URL is of the form file:/// it fails. - When the source is generated via JDBCInputFormat it fails. - My real app uses the JDBCInputFormat but I converted it to work off data that might be in a file. In all cases, I stopped the cluster and restarted the cluster. Then ran the application twice, once to make sure the error occurred on a clean cluster and then once again on a cluster that had previously had a failed job. You can find the application at https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile. Please let me know if there is anything I can do to help. Aaron On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Aaron, > > thanks for reporting the issue. > > You are right that the Exception is thrown during a shuffle. The receiver > initiates a TCP connection to receive all the data for the join. A failing > connect usually means that there respective TaskManager is not running. > > Can you check whether all expected task managers are running? You can use > the web interface of the job manager for this (http://jm-address:8081). > > Some further questions: > - Are you running in stand alone/cluster mode (e.g. slaves files > configured and bin/start-cluster.sh script used)? > - Is this reproducible? > > – Ufuk > > On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote: > > > Hello, > > > > I have a process that works fine with flink 0.8.1 but I decided to test > it against 0.9.0-milestone-1. I have 12 task managers across 3 machines - > so it's a small setup. > > > > The process fails with the following message. It appears that it's > attempting to do a shuffle in response to my join request. I checked all 3 > machines and there are no issues with the hostname on any of them. But the > host being reported as "localhost" seems to make me wonder if I haven't > missed something obvious. > > > > I noticed this exception in one of the Travis CI builds, so I'm hoping > it's something obvious I've missed. > > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) switched > to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) switched > to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) switched > to RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) switched > to FAILED > > java.lang.Exception: The data preparation for task 'Join (Join at > run(Job.java:137))' , caused an error: Connecting the channel failed: > Connection refused: localhost/127.0.0.1:46229 > > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) > > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.io.IOException: Connecting the channel failed: > Connection refused: localhost/127.0.0.1:46229 > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) > > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) > > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) > > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) > > at > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) > > at > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) > > at > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) > > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > ... 3 more > > > > Thanks > >