If I want to run two different select queries on a flink table created from the dataStream, the blink-planner runs them as two different jobs. Is there a way to combine them and run as a single job ? Example code :
/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); System.out.println("Running credit scores : "); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> recordsStream = env.readTextFile("src/main/resources/credit_trial.csv"); DataStream<CreditRecord> creditStream = recordsStream .filter((FilterFunction<String>) line -> !line.contains( "Loan ID,Customer ID,Loan Status,Current Loan Amount,Term,Credit Score,Annual Income,Years in current job" + ",Home Ownership,Purpose,Monthly Debt,Years of Credit History,Months since last delinquent,Number of Open Accounts," + "Number of Credit Problems,Current Credit Balance,Maximum Open Credit,Bankruptcies,Tax Liens")) .map(new MapFunction<String, CreditRecord>() { @Override public CreditRecord map(String s) throws Exception { String[] fields = s.split(","); return new CreditRecord(fields[0], fields[2], Double.parseDouble(fields[3]), fields[4], fields[5].trim().equals("")?0.0: Double.parseDouble(fields[5]), fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]), fields[8], Double.parseDouble(fields[15])); } }); tableEnv.createTemporaryView("CreditDetails", creditStream); Table creditDetailsTable = tableEnv.from("CreditDetails"); Table resultsTable = creditDetailsTable.select($("*")) .filter($("loanStatus").isEqual("Charged Off")); TableResult result = resultsTable.execute(); result.print(); Table resultsTable2 = creditDetailsTable.select($("*")) .filter($("loanStatus").isEqual("Fully Paid")); TableResult result2 = resultsTable2.execute(); result2.print();/ The above code creates 2 different jobs, but I don't want that, I want it to run in a single job. Is there any way out ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/