Hi Luke.

I hope this email finds you well. I wanted to share my agreement with Shammon's solution regarding your query. Additionally, I would like to provide some helpful hints that might assist you further:

1. To create a PackagedProgram, you can utilize the PackagedProgram.Builder class. 2. Building a JobGraph can be achieved by employing the PackagedProgramUtils.createJobGraph method. 3. Initializing a RestClusterClient with your Flink cluster configuration will allow you to interact with the cluster.
4. By submitting the jobgraph, you will obtain a JobID.
5. Finally, you can use the JobID to communicate with your job within the Flink cluster.

I hope these suggestions prove beneficial to you in your current endeavor. Should you require any further assistance, please do not hesitate to reach out. The solution that i mentioned below is my current solution of manage the flink job.

Best,
Jiadong Lu

On 2023/5/13 2:00, Luke Xiong wrote:
Hi Weihua and Shammon,

Thanks for the pointers.I tried both, unfortunately neither works.

By enabling "execution.attached", there doesn't seem to be any difference than the default settings. doSomeCleanupTasks() is called right away while the pipeline is still running; and env.executeAsync().getJobStatus() causes an exception:     org.apache.flink.util.FlinkRuntimeException: The Job Status cannot be requested when in Web Submission.

FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*

Regards,
Luke

On Fri, May 12, 2023 at 1:32 AM Weihua Hu <huweihua....@gmail.com <mailto:huweihua....@gmail.com>> wrote:


    Hi, Luke

    You can enable "execution.attached", then env.execute() will wait
    until the job is finished.

    
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached>

    Best,
    Weihua


    On Fri, May 12, 2023 at 8:59 AM Shammon FY <zjur...@gmail.com
    <mailto:zjur...@gmail.com>> wrote:

        Hi Luke,

        Maybe you can get 'JobClient' after submit the job and check the
        job status with 'JobClient.getJobStatus()'

        Best,
        Shammon FY


        On Fri, May 12, 2023 at 2:58 AM Luke Xiong <leix...@gmail.com
        <mailto:leix...@gmail.com>> wrote:

            Hi,

            My flink job needs to do something when the pipeline
            execution has ended. The job code is like this:

            createSomeStream().applySomeOperators();
            env.execute(jobName);
            doSomeCleanupTasks();

            It looks like doSomeCleanupTasks() can be called while the
            pipeline is still running. The job is for processing a
            bounded stream, so it doesn't run forever. Is it possible to
            achieve this so doSomeCleanupTasks is called only when the
            pipeline has processed all the data? This happens when the
            runtime mode is STREAMING. Would running it in BATCH mode
            make any difference?

            Regards,
            Luke


Reply via email to