If I want to run two different select queries on a flink table created from
the dataStream, the blink-planner runs them as two different jobs. Is there
a way to combine them and run as a single job ? Example code :

/StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

System.out.println("Running credit scores : ");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<String> recordsStream =
        env.readTextFile("src/main/resources/credit_trial.csv");

DataStream<CreditRecord> creditStream = recordsStream
        .filter((FilterFunction<String>) line -> !line.contains(
                "Loan ID,Customer ID,Loan Status,Current Loan
Amount,Term,Credit Score,Annual Income,Years in current job" +
                        ",Home Ownership,Purpose,Monthly Debt,Years of
Credit History,Months since last delinquent,Number of Open Accounts," +
                        "Number of Credit Problems,Current Credit
Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
        .map(new MapFunction<String, CreditRecord>() {

            @Override
            public CreditRecord map(String s) throws Exception {

                String[] fields = s.split(",");

                return new CreditRecord(fields[0], fields[2],
Double.parseDouble(fields[3]),
                        fields[4], fields[5].trim().equals("")?0.0:
Double.parseDouble(fields[5]),
                       
fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
                        fields[8], Double.parseDouble(fields[15]));
            }
        });
tableEnv.createTemporaryView("CreditDetails", creditStream);
        Table creditDetailsTable = tableEnv.from("CreditDetails");

Table resultsTable = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Charged Off"));

TableResult result = resultsTable.execute();

result.print();


Table resultsTable2 = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Fully Paid"));

TableResult result2 = resultsTable2.execute();

result2.print();/

The above code creates 2 different jobs, but I don't want that, I want it to
run in a single job. Is there any way out ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to