I tested the TypeExtractor with your SourceA and SourceB types (adding
proper setters and getters) and it correctly returned a PojoType. Thus, I
would suspect that you haven’t specified the proper setters and getters in
your implementation.

Cheers,
Till
​

On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <
dominique.ro...@codecentric.de> wrote:

> Here we go!
>
>   ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx",
> 53408,"flink-job.jar");
>
>
>   DataSource<String> datasourceA= env.readTextFile("hdfs://dev//sourceA/");
>   DataSource<String> datasourceB= env.readTextFile("hdfs://dev//sourceB/");
>
>   DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
>   DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());
>
>   sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();
>
> Thanks a lot!
> Dominique
>
>
> Am 09.02.2016 um 14:36 schrieb Till Rohrmann:
>
> Could you post the complete example code (Flink job including the type
> definitions). For example, if the data sets are of type DataSet<Parent>,
> then it will be treated as a GenericType. Judging from your pseudo code,
> it looks fine on the first glance.
>
> Cheers,
> Till
> ​
>
> On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <
> dominique.ro...@codecentric.de> wrote:
>
>> Sorry, i was out for lunch. Maybe the problem is that sessionID is a
>> String?
>>
>> public abstract class Parent{
>>   private Date eventDate;
>>   private EventType eventType;
>>   private String sessionId;
>>
>> public Parent() { }
>> //GETTER & SETTER
>> }
>>
>> public class SourceA extends Parent{
>>   private Boolean outboundMessage;
>>   private String soapMessage;
>>
>> public SourceA () {
>>     super();
>>  }
>> //GETTER & SETTER
>> }
>>
>> public class SourceB extends Parent{
>>   private Integer id;
>>   private String username;
>>
>> public SourceB () {
>>     super();
>>  }
>> //GETTER & SETTER
>>
>> }
>>
>> Am 09.02.2016 um 12:06 schrieb Till Rohrmann:
>>
>> Could you share the code for your types SourceA and SourceB. It seems as
>> if Flink does not recognize them to be POJOs because he assigned them the
>> GenericType type. Either there is something wrong with the type
>> extractor or your implementation does not fulfil the requirements for
>> POJOs, as indicated by Chiwan.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <
>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote:
>>
>>> The fields in SourceA and SourceB are private but have public getters
>>> and setters. The classes provide an empty and public constructor.
>>> Am 09.02.2016 11:47 schrieb "Chiwan Park" < <chiwanp...@apache.org>
>>> chiwanp...@apache.org>:
>>>
>>>> Oh, the fields in SourceA have public getters. Does the fields in
>>>> SourceA have public setter? SourceA needs public setter for private fields.
>>>>
>>>> Regards,
>>>> Chiwan Park
>>>>
>>>> > On Feb 9, 2016, at 7:45 PM, Chiwan Park < <chiwanp...@apache.org>
>>>> chiwanp...@apache.org> wrote:
>>>> >
>>>> > Hi Dominique,
>>>> >
>>>> > It seems that `SourceA` is not dealt as POJO. Are all fields in
>>>> SourceA public? There are some requirements for POJO classes [1].
>>>> >
>>>> > [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
>>>> >
>>>> > Regards,
>>>> > Chiwan Park
>>>> >
>>>> >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <
>>>> <dominique.ro...@codecentric.de>dominique.ro...@codecentric.de> wrote:
>>>> >>
>>>> >> Hi  folks,
>>>> >>
>>>> >> i try to join two datasets containing some PoJos. Each PoJo inherit
>>>> a field "sessionId" from the parent class. The field is private but has a
>>>> public getter.
>>>> >>
>>>> >> The join is like this:
>>>> >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet =
>>>> sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
>>>> >>
>>>> >> But the result is the following execption:
>>>> >>
>>>> >> Exception in thread "main"
>>>> org.apache.flink.api.common.InvalidProgramException: This type
>>>> (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key.
>>>> >>    at
>>>> org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
>>>> >>    at
>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
>>>> >>    at
>>>> x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
>>>> >>
>>>> >> I spend some time with google around but I don't get an idea what is
>>>> wrong. I hope some of you can give me a hint...
>>>> >>
>>>> >> Greets
>>>> >> Dominique
>>>> >>
>>>> >
>>>>
>>>>
>>
>> --
>> Dominique Rondé | Senior Consultant
>>
>> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
>> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
>> www.meettheexperts.de | www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>
>>
>
> --
> Dominique Rondé | Senior Consultant
>
> codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
> mobil: +49 (0) 172.7182592www.codecentric.de | blog.codecentric.de | 
> www.meettheexperts.de | www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
>

Reply via email to