Hi Fabian,

I've been trying out the cascading-flink 1.0 branch (updated to 
cascading-3.1-wip-56) with our cascading.utils project.

I ran into one initial challenge, where older Kryo versions don't work with 
Flink - it seems like it has to be 2.24.0, otherwise you get a no-such-method 
error (2.19) or an odd hang while Kryo is trying to read (2.21). So there was a 
bit of version management required. I noticed that Flink has a dependency on 
Chill 0.7.4, which depends on Kryo 2.21.

After that change, our tests run, but it looks like the Flink planner is ignore 
the Scheme.setNumSinkParts() call.

E.g. Scheme.setNumSinkParts(1) should result in a single part-00000 file, and 
thus the upstream grouping should implicitly have a parallelism of 1.

This is described as a suggestion (e.g. if your Flow only has maps then no such 
parallelism can be guaranteed) but it does wind up being relied upon by many 
workflows, when generating a small output file that has to be globally sorted.

Thanks,

-- Ken

PS - Chris Wensel responded to the 
cascading.ComparePlatformsTest$CompareTestCase issue, and said:

> make sure you ‘exclude’ *TestCase from your unit test pattern.
> 
> all Cascading tests are *PlatformTest and *Test. there are no tests in 
> *TestCase




> From: Fabian Hueske
> Sent: March 30, 2016 2:04:15am PDT
> To: dev@flink.apache.org
> Subject: Re: Expected duration for cascading-flink tests?
> 
> Hi Ken,
> 
> regarding the failed tests:
> - cascading.JoinFieldedPipesPlatformTest$testJoinMergeGroupBy is expected
> to fail due to restrictions in the MR/Tez engines. If I remember correctly,
> this is about deadlocks that need to be resolved by splitting a job.
> Flink's optimizer detects such situations and places a dam breaker to
> resolve such a situation within a single job and is hence able to execute
> the job correctly.
> - cascading.ComparePlatformsTest$CompareTestCase I think you are right on
> this one. When I implemented the runner, I did not find a way to make this
> tests pass. It looked like an issue with the test itself as you assumed as
> well.
> 
> Btw. I ported the runner to Flink 1.0 and bumped the Cascading 3.1
> WIP version already, but haven't done an "official" release yet. You find
> the code in the flink-1.0 branch [1]. With Flink 1.0, we also extended the
> support for outer joins. It might be possible to get rid of some of the
> HashJoin restrictions, but I have to take a closer look at how outer hash
> joins are done with Cascading MR/Tez.
> Anyway, I can do a Cascading-Flink release for Flink 1.0 soon and extend
> HashJoin support later.
> 
> Best, Fabian
> 
> [1] https://github.com/dataartisans/cascading-flink/tree/flink-1.0
> 
> 2016-03-30 6:08 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:
> 
>> Hi Fabian,
>> 
>>> From: Fabian Hueske
>>> Sent: March 29, 2016 3:51:08pm PDT
>>> To: dev@flink.apache.org
>>> Subject: Re: Expected duration for cascading-flink tests?
>>> 
>>> Hi Ken,
>>> 
>>> no, this is definitely not expected. The tests complete in about 30 mins
>> on
>>> my machine.
>>> Is it possible that you have another Flink process running on your
>> machine
>>> (maybe a debug thread in your IDE)? That could explain the "Address
>> already
>>> in use" exceptions.
>> 
>> Good call - I'd run "bin/stop-local.sh" previously, but I see that there's
>> still the Flink process running.
>> 
>> Re-running bin/stop-local.sh displays "No jobmanager daemon to stop on
>> host Kens-MacBook-Air.local.", but still doesn't kill off the Flink process.
>> 
>> What might cause that situation?
>> 
>> In any case, I manually killed the process and started the build again,
>> and it finished in about 20 minutes, which is great.
>> 
>> I see the expected errors, e.g.
>> 
>> HashJoin does only support InnerJoin and LeftJoin but is
>> cascading.pipe.joiner.OuterJoin
>> 
>> though this one seems odd:
>> 
>>> testJoinMergeGroupBy(cascading.JoinFieldedPipesPlatformTest)  Time
>> elapsed: 0.048 sec  <<< FAILURE!
>>> junit.framework.AssertionFailedError: planner should throw error on plan
>> 
>> FlinkTestPlatform needs to return true from supportsGroupByAfterMerge() -
>> assuming that this is actually the case (seems reasonable for Flink)
>> 
>> Though making that change requires cascading-wip-56 to avoid a compilation
>> error on the @Override.
>> 
>> There's also this one:
>> 
>>> Running cascading.ComparePlatformsTest$CompareTestCase
>>> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.053
>> sec <<< FAILURE! - in cascading.ComparePlatformsTest$CompareTestCase
>>> warning(junit.framework.TestSuite$1)  Time elapsed: 0.009 sec  <<<
>> FAILURE!
>>> junit.framework.AssertionFailedError: Class
>> cascading.ComparePlatformsTest$CompareTestCase has no public constructor
>> TestCase(String name) or TestCase()
>>>      at junit.framework.Assert.fail(Assert.java:57)
>>>      at junit.framework.TestCase.fail(TestCase.java:227)
>>>      at junit.framework.TestSuite$1.runTest(TestSuite.java:100)
>> 
>> 
>> But that seems like an issue with the Cascading test code. I'll check
>> w/Chris and see what he says.
>> 
>> Anyway, the build worked with the update to cascading-wip-56.
>> 
>> I also tried updating to Flink 1.0.0 (from 0.10.0), but so far I've run
>> into some compilation errors, e.g. in FlinkFlowStep.java it can't find the
>> JavaPlan class.
>> 
>> Thanks again for the help,
>> 
>> -- Ken
>> 
>> 
>> 
>>> "
>>> Best, Fabian
>>> 
>>> 2016-03-29 20:36 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:
>>> 
>>>> An update (and a nudge)…
>>>> 
>>>> So far it's been more than 20 hours, and the tests are still running.
>>>> 
>>>> Most tests seem to fail with one of two different errors…
>>>> 
>>>> 1. Address already in use
>>>> 
>>>> cascading.flow.FlowException: [test] unhandled exception
>>>>       at cascading.flow.BaseFlow.complete(BaseFlow.java:977)
>>>>       at
>>>> 
>> cascading.flow.FlowStrategiesPlatformTest.testSkipStrategiesReplace(FlowStrategiesPlatformTest.java:67)
>>>> Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to:
>> /
>>>> 127.0.0.1:6123
>>>>       at
>>>> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
>>>>       …
>>>> Caused by: java.net.BindException: Address already in use
>>>>       …
>>>> 
>>>> 2. FlowStepJob.blockOnJob  throws a cascading.flow.FlowException
>>>> 
>>>> All caused by a 100 second timeout
>>>> 
>>>> Is the above expected?
>>>> 
>>>> Thanks,
>>>> 
>>>> -- Ken
>>>> 
>>>>> From: Ken Krugler
>>>>> Sent: March 28, 2016 3:39:12pm PDT
>>>>> To: dev@flink.apache.org
>>>>> Subject: Expected duration for cascading-flink tests?
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I'm curious how long the tests are expected to take for
>> cascading-flink.
>>>>> 
>>>>> I know that https://github.com/dataArtisans/cascading-flink recommends
>>>> running mvn clean install with -DskipTests, but I was going to try
>> updating
>>>> to flink 1.0.0 (currently using 0.10.0) and cascading 3.1.0-wip-56
>>>> (currently on wip-39), so I wanted to first verify that all tests passed
>>>> before updating and then running the tests again.
>>>>> 
>>>>> In any case, the tests have been running for about 2.5 hours now. From
>>>> what I can tell, it's legit - most of the time is tied to
>>>> cascading.flow.planner.rul.RuleSetExec's call() method.
>>>>> 
>>>>> Maybe this is a sign that it's time for a new Mac :)
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> -- Ken





--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Reply via email to