I tested the TypeExtractor with your SourceA and SourceB types (adding proper setters and getters) and it correctly returned a PojoType. Thus, I would suspect that you haven’t specified the proper setters and getters in your implementation.
Cheers, Till On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé < dominique.ro...@codecentric.de> wrote: > Here we go! > > ExecutionEnvironment env = > ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx", > 53408,"flink-job.jar"); > > > DataSource<String> datasourceA= env.readTextFile("hdfs://dev//sourceA/"); > DataSource<String> datasourceB= env.readTextFile("hdfs://dev//sourceB/"); > > DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper()); > DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper()); > > sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print(); > > Thanks a lot! > Dominique > > > Am 09.02.2016 um 14:36 schrieb Till Rohrmann: > > Could you post the complete example code (Flink job including the type > definitions). For example, if the data sets are of type DataSet<Parent>, > then it will be treated as a GenericType. Judging from your pseudo code, > it looks fine on the first glance. > > Cheers, > Till > > > On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé < > dominique.ro...@codecentric.de> wrote: > >> Sorry, i was out for lunch. Maybe the problem is that sessionID is a >> String? >> >> public abstract class Parent{ >> private Date eventDate; >> private EventType eventType; >> private String sessionId; >> >> public Parent() { } >> //GETTER & SETTER >> } >> >> public class SourceA extends Parent{ >> private Boolean outboundMessage; >> private String soapMessage; >> >> public SourceA () { >> super(); >> } >> //GETTER & SETTER >> } >> >> public class SourceB extends Parent{ >> private Integer id; >> private String username; >> >> public SourceB () { >> super(); >> } >> //GETTER & SETTER >> >> } >> >> Am 09.02.2016 um 12:06 schrieb Till Rohrmann: >> >> Could you share the code for your types SourceA and SourceB. It seems as >> if Flink does not recognize them to be POJOs because he assigned them the >> GenericType type. Either there is something wrong with the type >> extractor or your implementation does not fulfil the requirements for >> POJOs, as indicated by Chiwan. >> >> Cheers, >> Till >> >> >> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé < >> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote: >> >>> The fields in SourceA and SourceB are private but have public getters >>> and setters. The classes provide an empty and public constructor. >>> Am 09.02.2016 11:47 schrieb "Chiwan Park" < <chiwanp...@apache.org> >>> chiwanp...@apache.org>: >>> >>>> Oh, the fields in SourceA have public getters. Does the fields in >>>> SourceA have public setter? SourceA needs public setter for private fields. >>>> >>>> Regards, >>>> Chiwan Park >>>> >>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < <chiwanp...@apache.org> >>>> chiwanp...@apache.org> wrote: >>>> > >>>> > Hi Dominique, >>>> > >>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in >>>> SourceA public? There are some requirements for POJO classes [1]. >>>> > >>>> > [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos >>>> > >>>> > Regards, >>>> > Chiwan Park >>>> > >>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé < >>>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote: >>>> >> >>>> >> Hi folks, >>>> >> >>>> >> i try to join two datasets containing some PoJos. Each PoJo inherit >>>> a field "sessionId" from the parent class. The field is private but has a >>>> public getter. >>>> >> >>>> >> The join is like this: >>>> >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = >>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); >>>> >> >>>> >> But the result is the following execption: >>>> >> >>>> >> Exception in thread "main" >>>> org.apache.flink.api.common.InvalidProgramException: This type >>>> (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key. >>>> >> at >>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287) >>>> >> at >>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890) >>>> >> at >>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) >>>> >> >>>> >> I spend some time with google around but I don't get an idea what is >>>> wrong. I hope some of you can give me a hint... >>>> >> >>>> >> Greets >>>> >> Dominique >>>> >> >>>> > >>>> >>>> >> >> -- >> Dominique Rondé | Senior Consultant >> >> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland >> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | >> www.meettheexperts.de | www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz >> >> > > -- > Dominique Rondé | Senior Consultant > > codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland > mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | > www.meettheexperts.de | www.more4fi.de > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz > >