Hi!

Thanks for the clarification. Flink currently does not have the
functionality to "revert all operations till some point". What I would
suggest is still to discard the resulting tables and run the pipeline from
the point when the filtering logic is changed. If the pipeline has
processed some data from that point you can pause the pipeline and run a
batch job to catch up with all changed logic in one go, then resume the
pipeline and it will process records as normal.

vtygoss <vtyg...@126.com> 于2021年10月22日周五 下午4:31写道:

> Hi!
>
> Thanks for your reply! I think i didn't make myself clear for problem 1,
> so i draw a picture.
>
>
> 1."tables in DB": things start at the database, and we sync the tables'
> changelog to dynamic tables by CDC tool. Each changelog includes RowData
> and RowKind such as INSERT / UPDATE / DELETE.
>
> 2. "logics": modeling like ods / dwd ....
>
> 3. "Table 1": a table which has some downstream tables. Table 2 is
> produced by "count(1) from table_1" and input Table 1; Table 3 is produced
> by "udf(...) from table_1 where a>0" and input Table 1. And when an insert
> event or delete event occurs in Table 1, Table 2 and Table 3 will change
> accordingly, as does the downstream tables of Table 2 and Table 3.
>
>
> - problem: The logic which generates Table 1 changes from "select * from
> table_0 where a>0" to "select * from table_0 where a<0". The old data in
> Table 1 generated by filter "a>0" is error now, and all downstream tables
> of Table 1 are error too.  So I want to find an easy way to truncate
> error data in Table 1 and all downstream tables of Table 1, but truncating
> Table 1 does not emit deletion event of each record in Table 1, so
> truncating doesn't trigger the deletion of corresponding records in all
> downstream tables which i think is the most important. Now I want to read
> all records in Table 1 and modify the rowkind of each Row from
> RowKind.INSERT to RowKind.DELETE, but i didn't find correspond API in
> BatchTableEnvironment or BatchExecutionEnvironment, code as below.
>
>
> ```
>
> TableEnvironemnt tenv;
>
> Table t1 = tenv.from("table_1
> /*+OPTIONS('read.streaming.enabled'='false')*/")
>
> ....
>
> Table t2 = t1.map(row->row.setRowKind(RowKind.DELETE))
>
> t2.insertInto("table_1")
>
> ```
>
>
> The suggestion creating a new table based on new logic, "new Table 1' " as
> shown in pic. I think creating new table will not solute this problem
> unless createing all downstream tables of Table 1 for example Table 2', but
> it's too heavy.
>
>
> Thanks for your suggestion. Do you have any other suggestions?
>
>
> Best Regards!
>
>
>
>
>
>
>
>
>
> 在 2021年10月22日 10:55,Caizhi Weng<tsreape...@gmail.com> 写道:
>
> Hi!
>
> For problem 1, Flink does not support deleting specific records. As you're
> running a batch job, I suggest creating a new table based on the new filter
> condition. Even if you can delete the old records you'll still have to
> generate the new ones, so why not generate them directly into a new place?
>
> For problem 2, yarn-cluster is the mode for a yarn session cluster, which
> means the cluster will remain even after the job is finished. If you want
> to finish the Flink job as well as the yarn job, use yarn-per-job mode
> instead.
>
> vtygoss <vtyg...@126.com> 于2021年10月21日周四 下午2:46写道:
>
>>
>> Hi, community!
>>
>>
>> I am working on building data processing pipeline based on changelog(CDC)
>> and i met two problems.
>>
>>
>> --(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
>>
>>                           --(sql_2)--->Table C---> other tables downstream
>>
>>
>> Table A is generated based on sql_0; Table B is generated based on sql_1
>> and input Table A; Table C is generated based on sql_2 and input Table A;
>> Table B and C have some downstream tables based on modeling.
>>
>>
>> - problem 1. When sql_0 logic is changed, e.g. from "select * from xx
>> where a>0" to " from xx where a<0", the data produced by filter "a>0" is
>> error. I want to find a way to clear the error data in Table A and
>> trigger the corresponding deletions of all tables downstream, then produce
>> new data by new filter a<0. So how to change the rowkind of each row in
>> Table A to RowKind.DELETE in Flink Batch execution mode? It will be very
>> nice if there is an use case of Flink 1.12.0.
>>
>>
>> - problem 2.  I found that Flink will launch a session cluster even
>> runtime mode is "yarn-cluster". In batch execution mode, the cluster still
>> run after all tasks finished. How to shutdown the cluster?
>>
>>
>>
>> Thanks for your any suggestion or reply!
>>
>>
>> Best Regards!
>>
>

Reply via email to