I think what you need is something like:

table.executeInsert("MySink").*await()*


On Tue, Mar 8, 2022 at 8:24 PM Adesh Dsilva <adeshavin...@gmail.com> wrote:

> Hi,
>
> I wrote a simple program using Flink Table API.
> There is a Table source reading from an avro file, after doing some
> operations the result is stored using a sink csv table using
> `executeInsert()`
>
> The program works fine and creates a csv file however the flink command
> does not wait for job to complete.
>
> ```
> adsilva@C02XH3ZSJGH6 flink-1.14.3 % ./bin/flink run
> ~/IdeaProjects/quickstart/target/app-0.1.jar --input
> part-v001-o000-r-00330.avro --output
> ~/Downloads/dev-things/flink-1.14.3/output-8
> Job has been submitted with JobID ff6f2e89a11e135d4978f7e4d2b1e2bc
> adsilva@C02XH3ZSJGH6 flink-1.14.3 %
> ```
>
> However if I write the same program as a mixture of DataStream and Table
> API by taking input from DataStream but operating on it using Table API and
> writing it using Table sink then it does wait for job completion.
>
>
> ```
> adsilva@C02XH3ZSJGH6 flink-1.14.3 % ./bin/flink run
> ~/IdeaProjects/quickstart/target/domains_stats_processor-0.1.jar --input
> part-v001-o000-r-00330.avro --output
> ~/Downloads/dev-things/flink-1.14.3/output-7
> Job has been submitted with JobID a00ffab083747ec0e421c1d6ab0822ea
> Job has been submitted with JobID 84ccd320f8015a53c8807380b829e529
> Program execution finished
> Job with JobID 84ccd320f8015a53c8807380b829e529 has finished.
> Job Runtime: 31521 ms
> ```
>
> It nicely prints the runtime as well (Although not sure why its starting
> two jobs here).
>
> Why does this happen? How can I make Flink wait for job completion with
> pure Table API?
>
> Please help, Thank you!

Reply via email to