Hi, community!

I am working on building data processing pipeline based on changelog(CDC) and i 
met two problems. 


--(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
                          --(sql_2)--->Table C---> other tables downstream


Table A is generated based on sql_0; Table B is generated based on sql_1 and 
input Table A; Table C is generated based on sql_2 and input Table A; Table B 
and C have some downstream tables based on modeling. 


- problem 1. When sql_0 logic is changed, e.g. from "select * from xx where 
a>0" to " from xx where a<0", the data produced by filter "a>0" is error. I 
want to find a way to clear the error data in Table A and trigger the 
corresponding deletions of all tables downstream, then produce new data by new 
filter a<0. So how to change the rowkind of each row in Table A to 
RowKind.DELETE in Flink Batch execution mode? It will be very nice if there is 
an use case of Flink 1.12.0.  


- problem 2.  I found that Flink will launch a session cluster even runtime 
mode is "yarn-cluster". In batch execution mode, the cluster still run after 
all tasks finished. How to shutdown the cluster? 




Thanks for your any suggestion or reply!


Best Regards!

Reply via email to