Thanks.  My setup is actually 3 task managers x 4 slots.  I played with the
parallelism and found that at low values, the error did not occur.  I can
only conclude that there is some form of data shuffling that is occurring
that is sensitive to the data source.  Yes, seems a little odd to me as
well.  OOC, did you load the file into HDFS or use it from a local file
system (e.g. file:///tmp/data.csv) - my results have shown that so far,
HDFS does not appear to be sensitive to this issue.

I updated the example to include my configuration and slaves, but for
brevity, I'll include the configurable bits here:

jobmanager.rpc.address: host01
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
parallelization.degree.default: 1
jobmanager.web.port: 8081
webclient.port: 8080
taskmanager.network.numberOfBuffers: 8192
taskmanager.tmp.dirs: /datassd/flink/tmp

And the slaves ...

host01
host02
host03

I did notice an extra empty line at the end of the slaves.  And while I
highly doubt it makes ANY difference, I'm still going to re-run with it
removed.

Thanks for looking into it.

Aaron

On Wed, Jun 24, 2015 at 4:26 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Aaron,
>
> thanks for preparing the example. I've checked it out and tried it with a
> similar setup (12 task managers with 1 slots each, running the job with
> parallelism of 12).
>
> I couldn't reproduce the problem. What have you configured in the "slaves"
> file? I think Flink does not allow you to run multiple task managers on a
> single machine with the startup scripts. Can you provide some information
> on how you start the system?
>
> Thanks for helping out with this.
>
> – Ufuk
>
> On 24 Jun 2015, at 05:09, Aaron Jackson <ajack...@pobox.com> wrote:
>
> > Yes, the task manager continues running.  I have put together a test app
> to demonstrate the problem and in doing so noticed some oddities.  The
> problem manifests itself on a simple join (I originally believed it was the
> distinct, I was wrong).
> >       • When the source is generated via fromCollection(), it works fine.
> >       • When the source is generated via readCsvFile() where the file
> URL is of the form file:/// it fails.
> >       • When the source is generated via JDBCInputFormat it fails.
> >               • My real app uses the JDBCInputFormat but I converted it
> to work off data that might be in a file.
> > In all cases, I stopped the cluster and restarted the cluster.  Then ran
> the application twice, once to make sure the error occurred on a clean
> cluster and then once again on a cluster that had previously had a failed
> job.  You can find the application at
> https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile.
> >
> > Please let me know if there is anything I can do to help.
> >
> > Aaron
> >
> > On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote:
> > Hey Aaron,
> >
> > thanks for reporting the issue.
> >
> > You are right that the Exception is thrown during a shuffle. The
> receiver initiates a TCP connection to receive all the data for the join. A
> failing connect usually means that there respective TaskManager is not
> running.
> >
> > Can you check whether all expected task managers are running? You can
> use the web interface of the job manager for this (http://jm-address:8081
> ).
> >
> > Some further questions:
> > - Are you running in stand alone/cluster mode (e.g. slaves files
> configured and bin/start-cluster.sh script used)?
> > - Is this reproducible?
> >
> > – Ufuk
> >
> > On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote:
> >
> > > Hello,
> > >
> > > I have a process that works fine with flink 0.8.1 but I decided to
> test it against 0.9.0-milestone-1.  I have 12 task managers across 3
> machines - so it's a small setup.
> > >
> > > The process fails with the following message.  It appears that it's
> attempting to do a shuffle in response to my join request.  I checked all 3
> machines and there are no issues with the hostname on any of them.  But the
> host being reported as "localhost" seems to make me wonder if I haven't
> missed something obvious.
> > >
> > > I noticed this exception in one of the Travis CI builds, so I'm hoping
> it's something obvious I've missed.
> > >
> > > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(11/12)
> switched to RUNNING
> > > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(9/12)
> switched to RUNNING
> > > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(12/12)
> switched to RUNNING
> > > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(12/12)
> switched to FAILED
> > > java.lang.Exception: The data preparation for task 'Join (Join at
> run(Job.java:137))' , caused an error: Connecting the channel failed:
> Connection refused: localhost/127.0.0.1:46229
> > >         at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472)
> > >         at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >         at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > >         at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.io.IOException: Connecting the channel failed:
> Connection refused: localhost/127.0.0.1:46229
> > >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193)
> > >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129)
> > >         at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65)
> > >         at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57)
> > >         at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106)
> > >         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305)
> > >         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328)
> > >         at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> > >         at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> > >         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> > >         at
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696)
> > >         at
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440)
> > >         at
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85)
> > >         at
> org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160)
> > >         at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> > >         ... 3 more
> > >
> > > Thanks
> >
> >
>
>

Reply via email to