Hi Di Thanks for your reply.
The timeout I'm referring to here is not the commit timeout, but rather the timeout for a single streamLoad transaction. Let's say we have set the transaction timeout for StreamLoad to be 10 minutes. Now, imagine there is a Flink job with two subtasks. Due to significant data skew and backpressure issues, subtask 0 and subtask 1 are processing at different speeds. Subtask 0 finishes processing this checkpoint first, while subtask 1 takes another 10 minutes to complete its processing. At this point, the job's checkpoint is done. However, since subtask 0 has been waiting for subtask 1 all along, its corresponding streamLoad transaction closes after more than 10 minutes have passed - by which time the server has already cleaned up this transaction, leading to a failed commit. Therefore, I would like to know if in such situations we can avoid this problem by setting a longer lifespan for transactions. Best, Feng On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote: > Hi, Feng, > > 1. Are you suggesting that when a commit gets stuck, we can interrupt the > commit request using a timeout parameter? Currently, there is no such > parameter. In my understanding, in a two-phase commit, checkpoint must be > enabled, so the commit timeout is essentially the checkpoint timeout. > Therefore, it seems unnecessary to add an additional parameter here. What > do you think? > > 2. In addition to deleting checkpoints to re-consume data again, the > Connector also provides an option to ignore commit errors[1]. However, this > option is only used for error recovery scenarios, such as when a > transaction is cleared by the server but you want to reuse the upstream > offset from the checkpoint. > > 3. Also, thank you for pointing out the issue with the parameter. It has > already been addressed[2], but the FLIP changes were overlooked. It has > been updated. > > [1] > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160 > [2] > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98 > > Brs > di.wu > > > > > 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道: > > > > Hi Di, > > > > Thank you for the update, as well as quickly implementing corresponding > > capabilities including filter push down and project push down. > > > > Regarding the transaction timeout, I still have some doubts. I would like > > to confirm if we can control this timeout parameter in the connector, > such > > as setting it to 10 minutes or 1 hour. > > Also, when a transaction is cleared by the server, the commit operation > of > > the connector will fail, leading to job failure. In this case, can users > > only choose to delete the checkpoint and re-consume historical data? > > > > There is also a small question regarding the parameters*: * > > *doris.request.connect.timeout.ms < > http://doris.request.connect.timeout.ms>* > > and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms > >*, > > can we change them to Duration type and remove the "ms" suffix.? > > This way, all time parameters can be kept uniform in type as duration. > > > > > > Best, > > Feng > > > > On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote: > > > >> Hi, Feng, > >> Thank you, that's a great suggestion ! > >> > >> I have already implemented FilterPushDown and removed that parameter on > >> DorisDynamicTableSource[1], and also updated FLIP. > >> > >> Regarding the mention of [Doris also aborts transactions], it may not > have > >> been described accurately. It mainly refers to the automatic expiration > of > >> long-running transactions in Doris that have not been committed for a > >> prolonged period. > >> > >> As for two-phase commit, when a commit fails, the checkpoint will also > >> fail, and the job will be continuously retried. > >> > >> [1] > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58 > >> > >> Brs > >> di.wu > >> > >> > >>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道: > >>> > >>> Hi Di > >>> > >>> Thank you for initiating this FLIP, +1 for this. > >>> > >>> Regarding the option `doris.filter.query` of doris source table > >>> > >>> Can we directly implement the FilterPushDown capability of Flink Source > >>> like Jdbc Source [1] instead of introducing an option? > >>> > >>> > >>> Regarding two-phase commit, > >>> > >>>> At the same time, Doris will also abort transactions that have not > been > >>> committed for a long time > >>> > >>> Can we control the transaction timeout in the connector? > >>> And control the behavior when timeout occurs, whether to discard by > >> default > >>> or trigger job failure? > >>> > >>> > >>> [1]. https://issues.apache.org/jira/browse/FLINK-16024 > >>> > >>> Best, > >>> Feng > >>> > >>> > >>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky > <ferenc.cs...@pm.me.invalid > >>> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> Thanks for driving this, +1 for the FLIP. > >>>> > >>>> Best, > >>>> Ferenc > >>>> > >>>> > >>>> > >>>> > >>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy < > hamdy10...@gmail.com > >>> > >>>> wrote: > >>>> > >>>>> > >>>>> > >>>>> Hello, > >>>>> Thanks for the proposal, +1 for the FLIP. > >>>>> > >>>>> Best Regards > >>>>> Ahmed Hamdy > >>>>> > >>>>> > >>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote: > >>>>> > >>>>>> Hi, Leonard > >>>>>> Thank you for your suggestion. > >>>>>> I referred to other Connectors[1], modified the naming and types of > >>>>>> relevant parameters[2], and also updated FLIP. > >>>>>> > >>>>>> [1] > >>>>>> > >>>> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/ > >>>>>> [1] > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java > >>>>>> > >>>>>> Brs, > >>>>>> di.wu > >>>>>> > >>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道: > >>>>>>> > >>>>>>> Thanks wudi for the updating, the FLIP generally looks good to me, > I > >>>>>>> only left two minor suggestions: > >>>>>>> > >>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s > >>>> looks > >>>>>>> strange to me, could we change all time interval related option > >>>> value type > >>>>>>> to Duration ? > >>>>>>> > >>>>>>> (2) Could you check and improve all config options like > >>>>>>> `doris.exec.mem.limit` to make them to follow flink config option > >>>> naming > >>>>>>> and value type? > >>>>>>> > >>>>>>> Best, > >>>>>>> Leonard > >>>>>>> > >>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道: > >>>>>>>>> > >>>>>>>>> Hi Di, > >>>>>>>>> > >>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to > >>>> know > >>>>>>>>> your > >>>>>>>>> thoughts about the following questions: > >>>>>>>>> > >>>>>>>>> 1. According to your clarification of the exactly-once, thanks > >>>> for it > >>>>>>>>> BTW, > >>>>>>>>> no PreCommitTopology is required. Does it make sense to let > >>>>>>>>> DorisSink[1] > >>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is > >>>>>>>>> deprecated[2] before turning the Doris connector into a Flink > >>>>>>>>> connector? > >>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a > >>>> data > >>>>>>>>> pipeline > >>>>>>>>> to support further e.g. ad-hoc query or cube with feasible > >>>>>>>>> pre-aggregation. > >>>>>>>>> Just out of curiosity, would you like to share some real use > >>>> cases that > >>>>>>>>> will use OLAP engines as the source of a streaming data > >>>> pipeline? Or it > >>>>>>>>> will only be used as the source for the batch? > >>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken. > >>>> Would you > >>>>>>>>> like > >>>>>>>>> to test the source in E2E too? > >>>>>>>>> > >>>>>>>>> [1] > >>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55 > >>>>>> > >>>>>>>>> [2] > >>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API > >>>>>> > >>>>>>>>> [3] > >>>>>> > >>>>>> > >>>> > >> > https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96 > >>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> Jing > >>>>>>>>> > >>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid > >>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, Jeyhun Karimov. > >>>>>>>>>> Thanks for your question. > >>>>>>>>>> > >>>>>>>>>> - How to ensure Exactly-Once? > >>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger > >>>> the > >>>>>>>>>> precommit api of StreamLoad to complete the persistence of > >>>> data in > >>>>>>>>>> Doris > >>>>>>>>>> (the data will not be visible at this time), and will also > >>>> pass this > >>>>>>>>>> TxnID > >>>>>>>>>> to the Committer. > >>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the > >>>> Committer > >>>>>>>>>> will > >>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete > >>>> the > >>>>>>>>>> visibility of the transaction. > >>>>>>>>>> 3. When the task is restarted, the Txn with successful > >>>> precommit and > >>>>>>>>>> failed commit will be aborted based on the label-prefix, and > >>>> Doris' > >>>>>>>>>> abort > >>>>>>>>>> API will be called. (At the same time, Doris will also abort > >>>>>>>>>> transactions > >>>>>>>>>> that have not been committed for a long time) > >>>>>>>>>> > >>>>>>>>>> ps: At the same time, this part of the content has been > >>>> updated in > >>>>>>>>>> FLIP > >>>>>>>>>> > >>>>>>>>>> - Because the default table model in Doris is Duplicate ( > >>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which > >>>> does not > >>>>>>>>>> have a primary key, batch writing may cause data duplication, > >>>> but > >>>>>>>>>> UNIQ The > >>>>>>>>>> model has a primary key, which ensures the idempotence of > >>>> writing, > >>>>>>>>>> thus > >>>>>>>>>> achieving Exactly-Once > >>>>>>>>>> > >>>>>>>>>> Brs, > >>>>>>>>>> di.wu > >>>>>>>>>> > >>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道: > >>>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the proposal. +1 for the FLIP. > >>>>>>>>>>> I have a few questions: > >>>>>>>>>>> > >>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and > >>>> Flink's > >>>>>>>>>>> two-phase > >>>>>>>>>>> commit) combination will ensure the e2e exactly-once > >>>> semantics? > >>>>>>>>>>> > >>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the > >>>>>>>>>>> primary key > >>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate > >>>> more on > >>>>>>>>>>> that? > >>>>>>>>>>> Why it is not the default behavior but a workaround? > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Jeyhun > >>>>>>>>>>> > >>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv > >>>> decq12y...@gmail.com > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks for driving this. > >>>>>>>>>>>> The content is very detailed, it is recommended to add a > >>>> section on > >>>>>>>>>>>> Test > >>>>>>>>>>>> Plan for more completeness. > >>>>>>>>>>>> > >>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Previously, we had some discussions about contributing > >>>> Flink Doris > >>>>>>>>>>>>> Connector to the Flink community [1]. I want to further > >>>> promote > >>>>>>>>>>>>> this > >>>>>>>>>>>>> work. > >>>>>>>>>>>>> I hope everyone will help participate in this FLIP > >>>> discussion and > >>>>>>>>>>>>> provide > >>>>>>>>>>>>> more valuable opinions and suggestions. > >>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p > >>>>>>>>>>>>> > >>>>>>>>>>>>> Brs, > >>>>>>>>>>>>> di.wu > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> As discussed in the previous email [1], about > >>>> contributing the > >>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>> Doris Connector to the Flink community. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time > >>>> analytical > >>>>>>>>>>>>>> database > >>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink > >>>> is used for > >>>>>>>>>>>>>> data > >>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris, > >>>> Flink Doris > >>>>>>>>>>>>>> Connector > >>>>>>>>>>>>>> is an effective tool. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector > >>>> to the Flink > >>>>>>>>>>>>>> community will further expand the Flink Connectors > >>>> ecosystem. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So I would like to start an official discussion > >>>> FLIP-399: Flink > >>>>>>>>>>>>>> Connector Doris[3]. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions > >>>> from the > >>>>>>>>>>>>>> community on the proposal. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>> > >>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p > >>>>>>>>>>>>>> [2] > >>>>>> > >>>>>> > https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/ > >>>>>> > >>>>>>>>>>>>>> [3] > >>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris > >>>>>> > >>>>>>>>>>>>>> Brs, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> di.wu > >>>> > >> > >> > > > >