Thanks. My setup is actually 3 task managers x 4 slots. I played with the parallelism and found that at low values, the error did not occur. I can only conclude that there is some form of data shuffling that is occurring that is sensitive to the data source. Yes, seems a little odd to me as well. OOC, did you load the file into HDFS or use it from a local file system (e.g. file:///tmp/data.csv) - my results have shown that so far, HDFS does not appear to be sensitive to this issue.
I updated the example to include my configuration and slaves, but for brevity, I'll include the configurable bits here: jobmanager.rpc.address: host01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 512 taskmanager.heap.mb: 2048 taskmanager.numberOfTaskSlots: 4 parallelization.degree.default: 1 jobmanager.web.port: 8081 webclient.port: 8080 taskmanager.network.numberOfBuffers: 8192 taskmanager.tmp.dirs: /datassd/flink/tmp And the slaves ... host01 host02 host03 I did notice an extra empty line at the end of the slaves. And while I highly doubt it makes ANY difference, I'm still going to re-run with it removed. Thanks for looking into it. Aaron On Wed, Jun 24, 2015 at 4:26 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Aaron, > > thanks for preparing the example. I've checked it out and tried it with a > similar setup (12 task managers with 1 slots each, running the job with > parallelism of 12). > > I couldn't reproduce the problem. What have you configured in the "slaves" > file? I think Flink does not allow you to run multiple task managers on a > single machine with the startup scripts. Can you provide some information > on how you start the system? > > Thanks for helping out with this. > > – Ufuk > > On 24 Jun 2015, at 05:09, Aaron Jackson <ajack...@pobox.com> wrote: > > > Yes, the task manager continues running. I have put together a test app > to demonstrate the problem and in doing so noticed some oddities. The > problem manifests itself on a simple join (I originally believed it was the > distinct, I was wrong). > > • When the source is generated via fromCollection(), it works fine. > > • When the source is generated via readCsvFile() where the file > URL is of the form file:/// it fails. > > • When the source is generated via JDBCInputFormat it fails. > > • My real app uses the JDBCInputFormat but I converted it > to work off data that might be in a file. > > In all cases, I stopped the cluster and restarted the cluster. Then ran > the application twice, once to make sure the error occurred on a clean > cluster and then once again on a cluster that had previously had a failed > job. You can find the application at > https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile. > > > > Please let me know if there is anything I can do to help. > > > > Aaron > > > > On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote: > > Hey Aaron, > > > > thanks for reporting the issue. > > > > You are right that the Exception is thrown during a shuffle. The > receiver initiates a TCP connection to receive all the data for the join. A > failing connect usually means that there respective TaskManager is not > running. > > > > Can you check whether all expected task managers are running? You can > use the web interface of the job manager for this (http://jm-address:8081 > ). > > > > Some further questions: > > - Are you running in stand alone/cluster mode (e.g. slaves files > configured and bin/start-cluster.sh script used)? > > - Is this reproducible? > > > > – Ufuk > > > > On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote: > > > > > Hello, > > > > > > I have a process that works fine with flink 0.8.1 but I decided to > test it against 0.9.0-milestone-1. I have 12 task managers across 3 > machines - so it's a small setup. > > > > > > The process fails with the following message. It appears that it's > attempting to do a shuffle in response to my join request. I checked all 3 > machines and there are no issues with the hostname on any of them. But the > host being reported as "localhost" seems to make me wonder if I haven't > missed something obvious. > > > > > > I noticed this exception in one of the Travis CI builds, so I'm hoping > it's something obvious I've missed. > > > > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(11/12) > switched to RUNNING > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(9/12) > switched to RUNNING > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:176))(12/12) > switched to RUNNING > > > 06/23/2015 05:03:00 Join (Join at run(Job.java:137))(12/12) > switched to FAILED > > > java.lang.Exception: The data preparation for task 'Join (Join at > run(Job.java:137))' , caused an error: Connecting the channel failed: > Connection refused: localhost/127.0.0.1:46229 > > > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472) > > > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.io.IOException: Connecting the channel failed: > Connection refused: localhost/127.0.0.1:46229 > > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193) > > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129) > > > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65) > > > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57) > > > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106) > > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305) > > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328) > > > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > > > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696) > > > at > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440) > > > at > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85) > > > at > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160) > > > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > > ... 3 more > > > > > > Thanks > > > > > >