Hi,
your guess is correct. I use java all the time... Here is the complete
stacktrace:
Exception in thread "main"
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:367)
at org.apache.flink.client.program.Client.runBlocking(Client.java:345)
at org.apache.flink.client.program.Client.runBlocking(Client.java:312)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189)
at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1583)
at
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN
Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' ,
caused an error: Unsupported driver strategy for join driver: CO_GROUP_RAW
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Unsupported driver strategy for join
driver: CO_GROUP_RAW
at
org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
... 3 more
Am 09.02.2016 um 21:03 schrieb Fabian Hueske:
Hi,
glad you could resolve the POJO issue, but the new error doesn't look
right.
The CO_GROUP_RAW strategy should only be used for programs that are
implemented against the Python DataSet API.
I guess that's not the case since all code snippets were Java so far.
Can you post the full stacktrace of the exception?
2016-02-09 20:13 GMT+01:00 Dominique Rondé
<dominique.ro...@codecentric.de <mailto:dominique.ro...@codecentric.de>>:
Hi all,
i finally figured out that there is a getter for a boolean field
which may be the source of the trouble. It seems that
getBooleanField (as we use it) is not the best choice. Now the
plan is executed with another error code. :(
Caused by: java.lang.Exception: Unsupported driver strategy for
join driver: CO_GROUP_RAW
Is there any link to a documentation or some example code which
you may recommend beside the offical documentation?
But folks, thanks for your greate support! A really nice community
here!
Greets
Dominique
Am 09.02.2016 um 19:41 schrieb Till Rohrmann:
I tested the |TypeExtractor| with your |SourceA| and |SourceB|
types (adding proper setters and getters) and it correctly
returned a |PojoType|. Thus, I would suspect that you haven’t
specified the proper setters and getters in your implementation.
Cheers,
Till
On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé
<dominique.ro...@codecentric.de
<mailto:dominique.ro...@codecentric.de>> wrote:
Here we go!
ExecutionEnvironment env =
ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
53408,"flink-job.jar");
DataSource<String> datasourceA=
env.readTextFile("hdfs://dev//sourceA/");
DataSource<String> datasourceB=
env.readTextFile("hdfs://dev//sourceB/");
DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());
sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
Thanks a lot!
Dominique
Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
Could you post the complete example code (Flink job
including the type definitions). For example, if the data
sets are of type |DataSet<Parent>|, then it will be treated
as a |GenericType|. Judging from your pseudo code, it looks
fine on the first glance.
Cheers,
Till
On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé
<dominique.ro...@codecentric.de
<mailto:dominique.ro...@codecentric.de>> wrote:
Sorry, i was out for lunch. Maybe the problem is that
sessionID is a String?
public abstract class Parent{
private Date eventDate;
private EventType eventType;
private String sessionId;
public Parent() { }
//GETTER & SETTER
}
public class SourceA extends Parent{
private Boolean outboundMessage;
private String soapMessage;
public SourceA () {
super();
}
//GETTER & SETTER
}
public class SourceB extends Parent{
private Integer id;
private String username;
public SourceB () {
super();
}
//GETTER & SETTER
}
Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
Could you share the code for your types |SourceA| and
|SourceB|. It seems as if Flink does not recognize them
to be POJOs because he assigned them the |GenericType|
type. Either there is something wrong with the type
extractor or your implementation does not fulfil the
requirements for POJOs, as indicated by Chiwan.
Cheers,
Till
On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé
<dominique.ro...@codecentric.de
<mailto:dominique.ro...@codecentric.de>> wrote:
The fields in SourceA and SourceB are private but
have public getters and setters. The classes
provide an empty and public constructor.
Am 09.02.2016 11:47 schrieb "Chiwan Park"
<chiwanp...@apache.org <mailto:chiwanp...@apache.org>>:
Oh, the fields in SourceA have public getters.
Does the fields in SourceA have public setter?
SourceA needs public setter for private fields.
Regards,
Chiwan Park
> On Feb 9, 2016, at 7:45 PM, Chiwan Park
<chiwanp...@apache.org
<mailto:chiwanp...@apache.org>> wrote:
>
> Hi Dominique,
>
> It seems that `SourceA` is not dealt as POJO.
Are all fields in SourceA public? There are
some requirements for POJO classes [1].
>
> [1]:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>
> Regards,
> Chiwan Park
>
>> On Feb 9, 2016, at 7:42 PM, Dominique Rondé
<dominique.ro...@codecentric.de
<mailto:dominique.ro...@codecentric.de>> wrote:
>>
>> Hi folks,
>>
>> i try to join two datasets containing some
PoJos. Each PoJo inherit a field "sessionId"
from the parent class. The field is private but
has a public getter.
>>
>> The join is like this:
>> DataSet<Tuple2<SourceA,SourceB>>
joinedDataSet =
sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>
>> But the result is the following execption:
>>
>> Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException:
This type
(GenericType<x.y.z.service.eventstore.dto.SourceA>)
cannot be used as key.
>> at
org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
>> at
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>> at
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>
>> I spend some time with google around but I
don't get an idea what is wrong. I hope some of
you can give me a hint...
>>
>> Greets
>> Dominique
>>
>
--
Dominique Rondé | Senior Consultant
codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt |
Deutschland
mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
Jürgen Schütz
--
Dominique Rondé | Senior Consultant
codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
Schütz
--
Dominique Rondé | Senior Consultant
codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
--
Dominique Rondé | Senior Consultant
codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil: +49 (0) 172.7182592
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de
Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz