Yes. Instead of calling execute on each table, create a StatementSet using
your StreamTableEnvironment (tableEnv.createStatementSet) and use addInsert
and finally .execute when you want to run the job.




On Sat, Apr 17, 2021, 03:20 tbud <tejasub1...@gmail.com> wrote:

> If I want to run two different select queries on a flink table created from
> the dataStream, the blink-planner runs them as two different jobs. Is there
> a way to combine them and run as a single job ? Example code :
>
> /StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(4);
>
> System.out.println("Running credit scores : ");
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> DataStream<String> recordsStream =
>         env.readTextFile("src/main/resources/credit_trial.csv");
>
> DataStream<CreditRecord> creditStream = recordsStream
>         .filter((FilterFunction<String>) line -> !line.contains(
>                 "Loan ID,Customer ID,Loan Status,Current Loan
> Amount,Term,Credit Score,Annual Income,Years in current job" +
>                         ",Home Ownership,Purpose,Monthly Debt,Years of
> Credit History,Months since last delinquent,Number of Open Accounts," +
>                         "Number of Credit Problems,Current Credit
> Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
>         .map(new MapFunction<String, CreditRecord>() {
>
>             @Override
>             public CreditRecord map(String s) throws Exception {
>
>                 String[] fields = s.split(",");
>
>                 return new CreditRecord(fields[0], fields[2],
> Double.parseDouble(fields[3]),
>                         fields[4], fields[5].trim().equals("")?0.0:
> Double.parseDouble(fields[5]),
>
> fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
>                         fields[8], Double.parseDouble(fields[15]));
>             }
>         });
> tableEnv.createTemporaryView("CreditDetails", creditStream);
>         Table creditDetailsTable = tableEnv.from("CreditDetails");
>
> Table resultsTable = creditDetailsTable.select($("*"))
>         .filter($("loanStatus").isEqual("Charged Off"));
>
> TableResult result = resultsTable.execute();
>
> result.print();
>
>
> Table resultsTable2 = creditDetailsTable.select($("*"))
>         .filter($("loanStatus").isEqual("Fully Paid"));
>
> TableResult result2 = resultsTable2.execute();
>
> result2.print();/
>
> The above code creates 2 different jobs, but I don't want that, I want it
> to
> run in a single job. Is there any way out ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to