Hey Aaron, thanks for preparing the example. I've checked it out and tried it with a similar setup (12 task managers with 1 slots each, running the job with parallelism of 12).
I couldn't reproduce the problem. What have you configured in the "slaves" file? I think Flink does not allow you to run multiple task managers on a single machine with the startup scripts. Can you provide some information on how you start the system? Thanks for helping out with this. – Ufuk On 24 Jun 2015, at 05:09, Aaron Jackson <ajack...@pobox.com> wrote: > Yes, the task manager continues running. I have put together a test app to > demonstrate the problem and in doing so noticed some oddities. The problem > manifests itself on a simple join (I originally believed it was the distinct, > I was wrong). > • When the source is generated via fromCollection(), it works fine. > • When the source is generated via readCsvFile() where the file URL is > of the form file:/// it fails. > • When the source is generated via JDBCInputFormat it fails. > • My real app uses the JDBCInputFormat but I converted it to > work off data that might be in a file. > In all cases, I stopped the cluster and restarted the cluster. Then ran the > application twice, once to make sure the error occurred on a clean cluster > and then once again on a cluster that had previously had a failed job. You > can find the application at > https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile. > > Please let me know if there is anything I can do to help. > > Aaron > > On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Aaron, > > thanks for reporting the issue. > > You are right that the Exception is thrown during a shuffle. The receiver > initiates a TCP connection to receive all the data for the join. A failing > connect usually means that there respective TaskManager is not running. > > Can you check whether all expected task managers are running? You can use the > web interface of the job manager for this (http://jm-address:8081). > > Some further questions: > - Are you running in stand alone/cluster mode (e.g. slaves files configured > and bin/start-cluster.sh script used)? > - Is this reproducible? > > – Ufuk > > On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote: > > > Hello, > > > > I have a process that works fine with flink 0.8.1 but I decided to test it > > against 0.9.0-milestone-1. I have 12 task managers across 3 machines - so > > it's a small setup. > > > > The process fails with the following message. It appears that it's > > attempting to do a shuffle in response to my join request. I checked all 3 > > machines and there are no issues with the hostname on any of them. But the > > host being reported as "localhost" seems to make me wonder if I haven't > > missed something obvious. > > > > I noticed this exception in one of the Travis CI builds, so I'm hoping it's > > something obvious I've missed. > > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) switched to > > RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) switched to > > RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) switched to > > RUNNING > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) switched to > > FAILED > > java.lang.Exception: The data preparation for task 'Join (Join at > > run(Job.java:137))' , caused an error: Connecting the channel failed: > > Connection refused: localhost/127.0.0.1:46229 > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) > > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.io.IOException: Connecting the channel failed: Connection > > refused: localhost/127.0.0.1:46229 > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) > > at > > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) > > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) > > at > > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) > > at > > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) > > at > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > > at > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) > > at > > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) > > at > > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) > > at > > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > ... 3 more > > > > Thanks > >