I think what you need is something like: table.executeInsert("MySink").*await()*
On Tue, Mar 8, 2022 at 8:24 PM Adesh Dsilva <adeshavin...@gmail.com> wrote: > Hi, > > I wrote a simple program using Flink Table API. > There is a Table source reading from an avro file, after doing some > operations the result is stored using a sink csv table using > `executeInsert()` > > The program works fine and creates a csv file however the flink command > does not wait for job to complete. > > ``` > adsilva@C02XH3ZSJGH6 flink-1.14.3 % ./bin/flink run > ~/IdeaProjects/quickstart/target/app-0.1.jar --input > part-v001-o000-r-00330.avro --output > ~/Downloads/dev-things/flink-1.14.3/output-8 > Job has been submitted with JobID ff6f2e89a11e135d4978f7e4d2b1e2bc > adsilva@C02XH3ZSJGH6 flink-1.14.3 % > ``` > > However if I write the same program as a mixture of DataStream and Table > API by taking input from DataStream but operating on it using Table API and > writing it using Table sink then it does wait for job completion. > > > ``` > adsilva@C02XH3ZSJGH6 flink-1.14.3 % ./bin/flink run > ~/IdeaProjects/quickstart/target/domains_stats_processor-0.1.jar --input > part-v001-o000-r-00330.avro --output > ~/Downloads/dev-things/flink-1.14.3/output-7 > Job has been submitted with JobID a00ffab083747ec0e421c1d6ab0822ea > Job has been submitted with JobID 84ccd320f8015a53c8807380b829e529 > Program execution finished > Job with JobID 84ccd320f8015a53c8807380b829e529 has finished. > Job Runtime: 31521 ms > ``` > > It nicely prints the runtime as well (Although not sure why its starting > two jobs here). > > Why does this happen? How can I make Flink wait for job completion with > pure Table API? > > Please help, Thank you!