Here we go!

ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx", 53408,"flink-job.jar");


  DataSource<String> datasourceA= env.readTextFile("hdfs://dev//sourceA/");
  DataSource<String> datasourceB= env.readTextFile("hdfs://dev//sourceB/");

  DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper());
  DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper());

sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print();

Thanks a lot!
Dominique

Am 09.02.2016 um 14:36 schrieb Till Rohrmann:

Could you post the complete example code (Flink job including the type definitions). For example, if the data sets are of type |DataSet<Parent>|, then it will be treated as a |GenericType|. Judging from your pseudo code, it looks fine on the first glance.

Cheers,
Till

​

On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <[email protected] <mailto:[email protected]>> wrote:

    Sorry, i was out for lunch. Maybe the problem is that sessionID is
    a String?

    public abstract class Parent{
      private Date eventDate;
      private EventType eventType;
      private String sessionId;

    public Parent() { }
    //GETTER & SETTER
    }

    public class SourceA extends Parent{
      private Boolean outboundMessage;
      private String soapMessage;

    public SourceA () {
        super();
     }
    //GETTER & SETTER
    }

    public class SourceB extends Parent{
      private Integer id;
      private String username;

    public SourceB () {
        super();
     }
    //GETTER & SETTER

    }

    Am 09.02.2016 um 12:06 schrieb Till Rohrmann:

    Could you share the code for your types |SourceA| and |SourceB|.
    It seems as if Flink does not recognize them to be POJOs because
    he assigned them the |GenericType| type. Either there is
    something wrong with the type extractor or your implementation
    does not fulfil the requirements for POJOs, as indicated by Chiwan.

    Cheers,
    Till

    ​

    On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé
    <[email protected]
    <mailto:[email protected]>> wrote:

        The fields in SourceA and SourceB are private but have public
        getters and setters. The classes provide an empty and public
        constructor.

        Am 09.02.2016 11:47 schrieb "Chiwan Park"
        <[email protected] <mailto:[email protected]>>:

            Oh, the fields in SourceA have public getters. Does the
            fields in SourceA have public setter? SourceA needs
            public setter for private fields.

            Regards,
            Chiwan Park

            > On Feb 9, 2016, at 7:45 PM, Chiwan Park
            <[email protected] <mailto:[email protected]>> wrote:
            >
            > Hi Dominique,
            >
            > It seems that `SourceA` is not dealt as POJO. Are all
            fields in SourceA public? There are some requirements for
            POJO classes [1].
            >
            > [1]:
            
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos
            >
            > Regards,
            > Chiwan Park
            >
            >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé
            <[email protected]
            <mailto:[email protected]>> wrote:
            >>
            >> Hi  folks,
            >>
            >> i try to join two datasets containing some PoJos. Each
            PoJo inherit a field "sessionId" from the parent class.
            The field is private but has a public getter.
            >>
            >> The join is like this:
            >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet =
            sourceA.join(SourceB).where("sessionId").equalTo("sessionId");
            >>
            >> But the result is the following execption:
            >>
            >> Exception in thread "main"
            org.apache.flink.api.common.InvalidProgramException: This
            type (GenericType<x.y.z.service.eventstore.dto.SourceA>)
            cannot be used as key.
            >>    at
            
org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287)
            >>    at
            
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890)
            >>    at
            
x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55)
            >>
            >> I spend some time with google around but I don't get
            an idea what is wrong. I hope some of you can give me a
            hint...
            >>
            >> Greets
            >> Dominique
            >>
            >



-- Dominique Rondé | Senior Consultant

    codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
    mobil:+49 (0) 172.7182592 <tel:%2B49%20%280%29%20172.7182592>
www.codecentric.de <http://www.codecentric.de> |blog.codecentric.de <http://blog.codecentric.de> |www.meettheexperts.de <http://www.meettheexperts.de> |www.more4fi.de <http://www.more4fi.de>
    Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
    Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
    Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz



--
Dominique Rondé | Senior Consultant

codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland
mobil: +49 (0) 172.7182592
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | 
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Reply via email to