Thanks Di Wu for driving this FLIP, the updated FLIP looks good to me, +1 to start a vote.
Best, Leonard > 2024年4月7日 下午12:52,wudi <676366...@qq.com.INVALID> 写道: > > Hi, > > since there are no more comments for a while, if there are no more comments > for another day, I will start a vote thread. > > Thanks, > di.wu > > >> 2024年4月1日 17:52,wudi <676366...@qq.com> 写道: >> >> Hi, >> >> Gentle ping to see if there are any other concerns or things that seems >> missing from the FLIP. >> >> Brs >> di.wu >> >>> 2024年3月25日 17:52,Feng Jin <jinfeng1...@gmail.com> 写道: >>> >>> Hi Di >>> >>> Thank you for your patience and explanation. >>> >>> If this is a server-side configuration, we currently cannot modify it in >>> the client configuration. If Doris supports client-side configuration in >>> the future, we can reconsider whether to support it. >>> >>> I currently have no other questions regarding this FLIP. LGTM. >>> >>> >>> Best, >>> Feng >>> >>> On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote: >>> >>>> Hi, Feng >>>> >>>> Yes, if the StreamLoad transaction timeout is very short, you may >>>> encounter this situation. >>>> >>>> The timeout for StreamLoad transactions is controlled by the >>>> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the >>>> default value is 12 hours. Currently, it is a global transaction >>>> configuration and cannot be set separately for a specific transaction. >>>> >>>> However, I understand the default 12-hour timeout should cover most cases >>>> unless you are restarting from a checkpoint that occurred a long time ago. >>>> What do you think? >>>> >>>> >>>> [1] >>>> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168 >>>> >>>> >>>> Brs >>>> di.wu >>>> >>>>> 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>> >>>>> Hi Di >>>>> >>>>> Thanks for your reply. >>>>> >>>>> The timeout I'm referring to here is not the commit timeout, but rather >>>> the >>>>> timeout for a single streamLoad transaction. >>>>> >>>>> Let's say we have set the transaction timeout for StreamLoad to be 10 >>>>> minutes. Now, imagine there is a Flink job with two subtasks. Due to >>>>> significant data skew and backpressure issues, subtask 0 and subtask 1 >>>> are >>>>> processing at different speeds. Subtask 0 finishes processing this >>>>> checkpoint first, while subtask 1 takes another 10 minutes to complete >>>> its >>>>> processing. At this point, the job's checkpoint is done. However, since >>>>> subtask 0 has been waiting for subtask 1 all along, its corresponding >>>>> streamLoad transaction closes after more than 10 minutes have passed - by >>>>> which time the server has already cleaned up this transaction, leading >>>> to a >>>>> failed commit. >>>>> Therefore, I would like to know if in such situations we can avoid this >>>>> problem by setting a longer lifespan for transactions. >>>>> >>>>> >>>>> Best, >>>>> Feng >>>>> >>>>> >>>>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote: >>>>> >>>>>> Hi, Feng, >>>>>> >>>>>> 1. Are you suggesting that when a commit gets stuck, we can interrupt >>>> the >>>>>> commit request using a timeout parameter? Currently, there is no such >>>>>> parameter. In my understanding, in a two-phase commit, checkpoint must >>>> be >>>>>> enabled, so the commit timeout is essentially the checkpoint timeout. >>>>>> Therefore, it seems unnecessary to add an additional parameter here. >>>> What >>>>>> do you think? >>>>>> >>>>>> 2. In addition to deleting checkpoints to re-consume data again, the >>>>>> Connector also provides an option to ignore commit errors[1]. However, >>>> this >>>>>> option is only used for error recovery scenarios, such as when a >>>>>> transaction is cleared by the server but you want to reuse the upstream >>>>>> offset from the checkpoint. >>>>>> >>>>>> 3. Also, thank you for pointing out the issue with the parameter. It has >>>>>> already been addressed[2], but the FLIP changes were overlooked. It has >>>>>> been updated. >>>>>> >>>>>> [1] >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160 >>>>>> [2] >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98 >>>>>> >>>>>> Brs >>>>>> di.wu >>>>>> >>>>>> >>>>>> >>>>>>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Di, >>>>>>> >>>>>>> Thank you for the update, as well as quickly implementing corresponding >>>>>>> capabilities including filter push down and project push down. >>>>>>> >>>>>>> Regarding the transaction timeout, I still have some doubts. I would >>>> like >>>>>>> to confirm if we can control this timeout parameter in the connector, >>>>>> such >>>>>>> as setting it to 10 minutes or 1 hour. >>>>>>> Also, when a transaction is cleared by the server, the commit operation >>>>>> of >>>>>>> the connector will fail, leading to job failure. In this case, can >>>> users >>>>>>> only choose to delete the checkpoint and re-consume historical data? >>>>>>> >>>>>>> There is also a small question regarding the parameters*: * >>>>>>> *doris.request.connect.timeout.ms < >>>>>> http://doris.request.connect.timeout.ms>* >>>>>>> and d*oris.request.read.timeout.ms < >>>> http://oris.request.read.timeout.ms >>>>>>> *, >>>>>>> can we change them to Duration type and remove the "ms" suffix.? >>>>>>> This way, all time parameters can be kept uniform in type as duration. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Feng >>>>>>> >>>>>>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote: >>>>>>> >>>>>>>> Hi, Feng, >>>>>>>> Thank you, that's a great suggestion ! >>>>>>>> >>>>>>>> I have already implemented FilterPushDown and removed that parameter >>>> on >>>>>>>> DorisDynamicTableSource[1], and also updated FLIP. >>>>>>>> >>>>>>>> Regarding the mention of [Doris also aborts transactions], it may not >>>>>> have >>>>>>>> been described accurately. It mainly refers to the automatic >>>> expiration >>>>>> of >>>>>>>> long-running transactions in Doris that have not been committed for a >>>>>>>> prolonged period. >>>>>>>> >>>>>>>> As for two-phase commit, when a commit fails, the checkpoint will also >>>>>>>> fail, and the job will be continuously retried. >>>>>>>> >>>>>>>> [1] >>>>>>>> >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58 >>>>>>>> >>>>>>>> Brs >>>>>>>> di.wu >>>>>>>> >>>>>>>> >>>>>>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi Di >>>>>>>>> >>>>>>>>> Thank you for initiating this FLIP, +1 for this. >>>>>>>>> >>>>>>>>> Regarding the option `doris.filter.query` of doris source table >>>>>>>>> >>>>>>>>> Can we directly implement the FilterPushDown capability of Flink >>>> Source >>>>>>>>> like Jdbc Source [1] instead of introducing an option? >>>>>>>>> >>>>>>>>> >>>>>>>>> Regarding two-phase commit, >>>>>>>>> >>>>>>>>>> At the same time, Doris will also abort transactions that have not >>>>>> been >>>>>>>>> committed for a long time >>>>>>>>> >>>>>>>>> Can we control the transaction timeout in the connector? >>>>>>>>> And control the behavior when timeout occurs, whether to discard by >>>>>>>> default >>>>>>>>> or trigger job failure? >>>>>>>>> >>>>>>>>> >>>>>>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024 >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Feng >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky >>>>>> <ferenc.cs...@pm.me.invalid >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks for driving this, +1 for the FLIP. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Ferenc >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy < >>>>>> hamdy10...@gmail.com >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> Thanks for the proposal, +1 for the FLIP. >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> Ahmed Hamdy >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, Leonard >>>>>>>>>>>> Thank you for your suggestion. >>>>>>>>>>>> I referred to other Connectors[1], modified the naming and types >>>> of >>>>>>>>>>>> relevant parameters[2], and also updated FLIP. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/ >>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java >>>>>>>>>>>> >>>>>>>>>>>> Brs, >>>>>>>>>>>> di.wu >>>>>>>>>>>> >>>>>>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to >>>> me, >>>>>> I >>>>>>>>>>>>> only left two minor suggestions: >>>>>>>>>>>>> >>>>>>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s >>>>>>>>>> looks >>>>>>>>>>>>> strange to me, could we change all time interval related option >>>>>>>>>> value type >>>>>>>>>>>>> to Duration ? >>>>>>>>>>>>> >>>>>>>>>>>>> (2) Could you check and improve all config options like >>>>>>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option >>>>>>>>>> naming >>>>>>>>>>>>> and value type? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Leonard >>>>>>>>>>>>> >>>>>>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Di, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to >>>>>>>>>> know >>>>>>>>>>>>>>> your >>>>>>>>>>>>>>> thoughts about the following questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks >>>>>>>>>> for it >>>>>>>>>>>>>>> BTW, >>>>>>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let >>>>>>>>>>>>>>> DorisSink[1] >>>>>>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink >>>> is >>>>>>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink >>>>>>>>>>>>>>> connector? >>>>>>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a >>>>>>>>>> data >>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible >>>>>>>>>>>>>>> pre-aggregation. >>>>>>>>>>>>>>> Just out of curiosity, would you like to share some real use >>>>>>>>>> cases that >>>>>>>>>>>>>>> will use OLAP engines as the source of a streaming data >>>>>>>>>> pipeline? Or it >>>>>>>>>>>>>>> will only be used as the source for the batch? >>>>>>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken. >>>>>>>>>> Would you >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>> to test the source in E2E too? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55 >>>>>>>>>>>> >>>>>>>>>>>>>>> [2] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API >>>>>>>>>>>> >>>>>>>>>>>>>>> [3] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96 >>>>>>>>>>>> >>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>> Jing >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid >>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, Jeyhun Karimov. >>>>>>>>>>>>>>>> Thanks for your question. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - How to ensure Exactly-Once? >>>>>>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger >>>>>>>>>> the >>>>>>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of >>>>>>>>>> data in >>>>>>>>>>>>>>>> Doris >>>>>>>>>>>>>>>> (the data will not be visible at this time), and will also >>>>>>>>>> pass this >>>>>>>>>>>>>>>> TxnID >>>>>>>>>>>>>>>> to the Committer. >>>>>>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the >>>>>>>>>> Committer >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete >>>>>>>>>> the >>>>>>>>>>>>>>>> visibility of the transaction. >>>>>>>>>>>>>>>> 3. When the task is restarted, the Txn with successful >>>>>>>>>> precommit and >>>>>>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and >>>>>>>>>> Doris' >>>>>>>>>>>>>>>> abort >>>>>>>>>>>>>>>> API will be called. (At the same time, Doris will also abort >>>>>>>>>>>>>>>> transactions >>>>>>>>>>>>>>>> that have not been committed for a long time) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ps: At the same time, this part of the content has been >>>>>>>>>> updated in >>>>>>>>>>>>>>>> FLIP >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Because the default table model in Doris is Duplicate ( >>>>>>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which >>>>>>>>>> does not >>>>>>>>>>>>>>>> have a primary key, batch writing may cause data duplication, >>>>>>>>>> but >>>>>>>>>>>>>>>> UNIQ The >>>>>>>>>>>>>>>> model has a primary key, which ensures the idempotence of >>>>>>>>>> writing, >>>>>>>>>>>>>>>> thus >>>>>>>>>>>>>>>> achieving Exactly-Once >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>> di.wu >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP. >>>>>>>>>>>>>>>>> I have a few questions: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and >>>>>>>>>> Flink's >>>>>>>>>>>>>>>>> two-phase >>>>>>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once >>>>>>>>>> semantics? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the >>>>>>>>>>>>>>>>> primary key >>>>>>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate >>>>>>>>>> more on >>>>>>>>>>>>>>>>> that? >>>>>>>>>>>>>>>>> Why it is not the default behavior but a workaround? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv >>>>>>>>>> decq12y...@gmail.com >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for driving this. >>>>>>>>>>>>>>>>>> The content is very detailed, it is recommended to add a >>>>>>>>>> section on >>>>>>>>>>>>>>>>>> Test >>>>>>>>>>>>>>>>>> Plan for more completeness. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Previously, we had some discussions about contributing >>>>>>>>>> Flink Doris >>>>>>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further >>>>>>>>>> promote >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> work. >>>>>>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP >>>>>>>>>> discussion and >>>>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>> more valuable opinions and suggestions. >>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>> >>>>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>>>>> di.wu >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> As discussed in the previous email [1], about >>>>>>>>>> contributing the >>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>>>> Doris Connector to the Flink community. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time >>>>>>>>>> analytical >>>>>>>>>>>>>>>>>>>> database >>>>>>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink >>>>>>>>>> is used for >>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris, >>>>>>>>>> Flink Doris >>>>>>>>>>>>>>>>>>>> Connector >>>>>>>>>>>>>>>>>>>> is an effective tool. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector >>>>>>>>>> to the Flink >>>>>>>>>>>>>>>>>>>> community will further expand the Flink Connectors >>>>>>>>>> ecosystem. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So I would like to start an official discussion >>>>>>>>>> FLIP-399: Flink >>>>>>>>>>>>>>>>>>>> Connector Doris[3]. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions >>>>>>>>>> from the >>>>>>>>>>>>>>>>>>>> community on the proposal. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>> >>>>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/ >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [3] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> di.wu >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>>> >> >