Hi!

For problem 1, Flink does not support deleting specific records. As you're
running a batch job, I suggest creating a new table based on the new filter
condition. Even if you can delete the old records you'll still have to
generate the new ones, so why not generate them directly into a new place?

For problem 2, yarn-cluster is the mode for a yarn session cluster, which
means the cluster will remain even after the job is finished. If you want
to finish the Flink job as well as the yarn job, use yarn-per-job mode
instead.

vtygoss <vtyg...@126.com> 于2021年10月21日周四 下午2:46写道:

>
> Hi, community!
>
>
> I am working on building data processing pipeline based on changelog(CDC)
> and i met two problems.
>
>
> --(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
>
>                           --(sql_2)--->Table C---> other tables downstream
>
>
> Table A is generated based on sql_0; Table B is generated based on sql_1
> and input Table A; Table C is generated based on sql_2 and input Table A;
> Table B and C have some downstream tables based on modeling.
>
>
> - problem 1. When sql_0 logic is changed, e.g. from "select * from xx
> where a>0" to " from xx where a<0", the data produced by filter "a>0" is
> error. I want to find a way to clear the error data in Table A and
> trigger the corresponding deletions of all tables downstream, then produce
> new data by new filter a<0. So how to change the rowkind of each row in
> Table A to RowKind.DELETE in Flink Batch execution mode? It will be very
> nice if there is an use case of Flink 1.12.0.
>
>
> - problem 2.  I found that Flink will launch a session cluster even
> runtime mode is "yarn-cluster". In batch execution mode, the cluster still
> run after all tasks finished. How to shutdown the cluster?
>
>
>
> Thanks for your any suggestion or reply!
>
>
> Best Regards!
>

Reply via email to