Yes, the task manager continues running.  I have put together a test app to
demonstrate the problem and in doing so noticed some oddities.  The problem
manifests itself on a simple join (I originally believed it was the
distinct, I was wrong).

   - When the source is generated via fromCollection(), it works fine.
   - When the source is generated via readCsvFile() where the file URL is
   of the form file:/// it fails.
   - When the source is generated via JDBCInputFormat it fails.
      - My real app uses the JDBCInputFormat but I converted it to work off
      data that might be in a file.

In all cases, I stopped the cluster and restarted the cluster.  Then ran
the application twice, once to make sure the error occurred on a clean
cluster and then once again on a cluster that had previously had a failed
job.  You can find the application at
https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile.

Please let me know if there is anything I can do to help.

Aaron

On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Aaron,
>
> thanks for reporting the issue.
>
> You are right that the Exception is thrown during a shuffle. The receiver
> initiates a TCP connection to receive all the data for the join. A failing
> connect usually means that there respective TaskManager is not running.
>
> Can you check whether all expected task managers are running? You can use
> the web interface of the job manager for this (http://jm-address:8081).
>
> Some further questions:
> - Are you running in stand alone/cluster mode (e.g. slaves files
> configured and bin/start-cluster.sh script used)?
> - Is this reproducible?
>
> – Ufuk
>
> On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote:
>
> > Hello,
> >
> > I have a process that works fine with flink 0.8.1 but I decided to test
> it against 0.9.0-milestone-1.  I have 12 task managers across 3 machines -
> so it's a small setup.
> >
> > The process fails with the following message.  It appears that it's
> attempting to do a shuffle in response to my join request.  I checked all 3
> machines and there are no issues with the hostname on any of them.  But the
> host being reported as "localhost" seems to make me wonder if I haven't
> missed something obvious.
> >
> > I noticed this exception in one of the Travis CI builds, so I'm hoping
> it's something obvious I've missed.
> >
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(11/12) switched
> to RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(9/12) switched
> to RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(12/12) switched
> to RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(12/12) switched
> to FAILED
> > java.lang.Exception: The data preparation for task 'Join (Join at
> run(Job.java:137))' , caused an error: Connecting the channel failed:
> Connection refused: localhost/127.0.0.1:46229
> >         at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472)
> >         at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >         at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.io.IOException: Connecting the channel failed:
> Connection refused: localhost/127.0.0.1:46229
> >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193)
> >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129)
> >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65)
> >         at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57)
> >         at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106)
> >         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305)
> >         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328)
> >         at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> >         at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >         at
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696)
> >         at
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440)
> >         at
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85)
> >         at
> org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160)
> >         at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> >         ... 3 more
> >
> > Thanks
>
>

Reply via email to