Hi, Gentle ping to see if there are any other concerns or things that seems missing from the FLIP.
Brs di.wu > 2024年3月25日 17:52,Feng Jin <jinfeng1...@gmail.com> 写道: > > Hi Di > > Thank you for your patience and explanation. > > If this is a server-side configuration, we currently cannot modify it in > the client configuration. If Doris supports client-side configuration in > the future, we can reconsider whether to support it. > > I currently have no other questions regarding this FLIP. LGTM. > > > Best, > Feng > > On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote: > >> Hi, Feng >> >> Yes, if the StreamLoad transaction timeout is very short, you may >> encounter this situation. >> >> The timeout for StreamLoad transactions is controlled by the >> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the >> default value is 12 hours. Currently, it is a global transaction >> configuration and cannot be set separately for a specific transaction. >> >> However, I understand the default 12-hour timeout should cover most cases >> unless you are restarting from a checkpoint that occurred a long time ago. >> What do you think? >> >> >> [1] >> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168 >> >> >> Brs >> di.wu >> >>> 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道: >>> >>> Hi Di >>> >>> Thanks for your reply. >>> >>> The timeout I'm referring to here is not the commit timeout, but rather >> the >>> timeout for a single streamLoad transaction. >>> >>> Let's say we have set the transaction timeout for StreamLoad to be 10 >>> minutes. Now, imagine there is a Flink job with two subtasks. Due to >>> significant data skew and backpressure issues, subtask 0 and subtask 1 >> are >>> processing at different speeds. Subtask 0 finishes processing this >>> checkpoint first, while subtask 1 takes another 10 minutes to complete >> its >>> processing. At this point, the job's checkpoint is done. However, since >>> subtask 0 has been waiting for subtask 1 all along, its corresponding >>> streamLoad transaction closes after more than 10 minutes have passed - by >>> which time the server has already cleaned up this transaction, leading >> to a >>> failed commit. >>> Therefore, I would like to know if in such situations we can avoid this >>> problem by setting a longer lifespan for transactions. >>> >>> >>> Best, >>> Feng >>> >>> >>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote: >>> >>>> Hi, Feng, >>>> >>>> 1. Are you suggesting that when a commit gets stuck, we can interrupt >> the >>>> commit request using a timeout parameter? Currently, there is no such >>>> parameter. In my understanding, in a two-phase commit, checkpoint must >> be >>>> enabled, so the commit timeout is essentially the checkpoint timeout. >>>> Therefore, it seems unnecessary to add an additional parameter here. >> What >>>> do you think? >>>> >>>> 2. In addition to deleting checkpoints to re-consume data again, the >>>> Connector also provides an option to ignore commit errors[1]. However, >> this >>>> option is only used for error recovery scenarios, such as when a >>>> transaction is cleared by the server but you want to reuse the upstream >>>> offset from the checkpoint. >>>> >>>> 3. Also, thank you for pointing out the issue with the parameter. It has >>>> already been addressed[2], but the FLIP changes were overlooked. It has >>>> been updated. >>>> >>>> [1] >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160 >>>> [2] >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98 >>>> >>>> Brs >>>> di.wu >>>> >>>> >>>> >>>>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>> >>>>> Hi Di, >>>>> >>>>> Thank you for the update, as well as quickly implementing corresponding >>>>> capabilities including filter push down and project push down. >>>>> >>>>> Regarding the transaction timeout, I still have some doubts. I would >> like >>>>> to confirm if we can control this timeout parameter in the connector, >>>> such >>>>> as setting it to 10 minutes or 1 hour. >>>>> Also, when a transaction is cleared by the server, the commit operation >>>> of >>>>> the connector will fail, leading to job failure. In this case, can >> users >>>>> only choose to delete the checkpoint and re-consume historical data? >>>>> >>>>> There is also a small question regarding the parameters*: * >>>>> *doris.request.connect.timeout.ms < >>>> http://doris.request.connect.timeout.ms>* >>>>> and d*oris.request.read.timeout.ms < >> http://oris.request.read.timeout.ms >>>>> *, >>>>> can we change them to Duration type and remove the "ms" suffix.? >>>>> This way, all time parameters can be kept uniform in type as duration. >>>>> >>>>> >>>>> Best, >>>>> Feng >>>>> >>>>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote: >>>>> >>>>>> Hi, Feng, >>>>>> Thank you, that's a great suggestion ! >>>>>> >>>>>> I have already implemented FilterPushDown and removed that parameter >> on >>>>>> DorisDynamicTableSource[1], and also updated FLIP. >>>>>> >>>>>> Regarding the mention of [Doris also aborts transactions], it may not >>>> have >>>>>> been described accurately. It mainly refers to the automatic >> expiration >>>> of >>>>>> long-running transactions in Doris that have not been committed for a >>>>>> prolonged period. >>>>>> >>>>>> As for two-phase commit, when a commit fails, the checkpoint will also >>>>>> fail, and the job will be continuously retried. >>>>>> >>>>>> [1] >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58 >>>>>> >>>>>> Brs >>>>>> di.wu >>>>>> >>>>>> >>>>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Di >>>>>>> >>>>>>> Thank you for initiating this FLIP, +1 for this. >>>>>>> >>>>>>> Regarding the option `doris.filter.query` of doris source table >>>>>>> >>>>>>> Can we directly implement the FilterPushDown capability of Flink >> Source >>>>>>> like Jdbc Source [1] instead of introducing an option? >>>>>>> >>>>>>> >>>>>>> Regarding two-phase commit, >>>>>>> >>>>>>>> At the same time, Doris will also abort transactions that have not >>>> been >>>>>>> committed for a long time >>>>>>> >>>>>>> Can we control the transaction timeout in the connector? >>>>>>> And control the behavior when timeout occurs, whether to discard by >>>>>> default >>>>>>> or trigger job failure? >>>>>>> >>>>>>> >>>>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024 >>>>>>> >>>>>>> Best, >>>>>>> Feng >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky >>>> <ferenc.cs...@pm.me.invalid >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks for driving this, +1 for the FLIP. >>>>>>>> >>>>>>>> Best, >>>>>>>> Ferenc >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy < >>>> hamdy10...@gmail.com >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Hello, >>>>>>>>> Thanks for the proposal, +1 for the FLIP. >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> Ahmed Hamdy >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote: >>>>>>>>> >>>>>>>>>> Hi, Leonard >>>>>>>>>> Thank you for your suggestion. >>>>>>>>>> I referred to other Connectors[1], modified the naming and types >> of >>>>>>>>>> relevant parameters[2], and also updated FLIP. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/ >>>>>>>>>> [1] >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java >>>>>>>>>> >>>>>>>>>> Brs, >>>>>>>>>> di.wu >>>>>>>>>> >>>>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道: >>>>>>>>>>> >>>>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to >> me, >>>> I >>>>>>>>>>> only left two minor suggestions: >>>>>>>>>>> >>>>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s >>>>>>>> looks >>>>>>>>>>> strange to me, could we change all time interval related option >>>>>>>> value type >>>>>>>>>>> to Duration ? >>>>>>>>>>> >>>>>>>>>>> (2) Could you check and improve all config options like >>>>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option >>>>>>>> naming >>>>>>>>>>> and value type? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Leonard >>>>>>>>>>> >>>>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Di, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to >>>>>>>> know >>>>>>>>>>>>> your >>>>>>>>>>>>> thoughts about the following questions: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks >>>>>>>> for it >>>>>>>>>>>>> BTW, >>>>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let >>>>>>>>>>>>> DorisSink[1] >>>>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink >> is >>>>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink >>>>>>>>>>>>> connector? >>>>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a >>>>>>>> data >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible >>>>>>>>>>>>> pre-aggregation. >>>>>>>>>>>>> Just out of curiosity, would you like to share some real use >>>>>>>> cases that >>>>>>>>>>>>> will use OLAP engines as the source of a streaming data >>>>>>>> pipeline? Or it >>>>>>>>>>>>> will only be used as the source for the batch? >>>>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken. >>>>>>>> Would you >>>>>>>>>>>>> like >>>>>>>>>>>>> to test the source in E2E too? >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55 >>>>>>>>>> >>>>>>>>>>>>> [2] >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API >>>>>>>>>> >>>>>>>>>>>>> [3] >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96 >>>>>>>>>> >>>>>>>>>>>>> Best regards, >>>>>>>>>>>>> Jing >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, Jeyhun Karimov. >>>>>>>>>>>>>> Thanks for your question. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - How to ensure Exactly-Once? >>>>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger >>>>>>>> the >>>>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of >>>>>>>> data in >>>>>>>>>>>>>> Doris >>>>>>>>>>>>>> (the data will not be visible at this time), and will also >>>>>>>> pass this >>>>>>>>>>>>>> TxnID >>>>>>>>>>>>>> to the Committer. >>>>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the >>>>>>>> Committer >>>>>>>>>>>>>> will >>>>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete >>>>>>>> the >>>>>>>>>>>>>> visibility of the transaction. >>>>>>>>>>>>>> 3. When the task is restarted, the Txn with successful >>>>>>>> precommit and >>>>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and >>>>>>>> Doris' >>>>>>>>>>>>>> abort >>>>>>>>>>>>>> API will be called. (At the same time, Doris will also abort >>>>>>>>>>>>>> transactions >>>>>>>>>>>>>> that have not been committed for a long time) >>>>>>>>>>>>>> >>>>>>>>>>>>>> ps: At the same time, this part of the content has been >>>>>>>> updated in >>>>>>>>>>>>>> FLIP >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Because the default table model in Doris is Duplicate ( >>>>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which >>>>>>>> does not >>>>>>>>>>>>>> have a primary key, batch writing may cause data duplication, >>>>>>>> but >>>>>>>>>>>>>> UNIQ The >>>>>>>>>>>>>> model has a primary key, which ensures the idempotence of >>>>>>>> writing, >>>>>>>>>>>>>> thus >>>>>>>>>>>>>> achieving Exactly-Once >>>>>>>>>>>>>> >>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>> di.wu >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP. >>>>>>>>>>>>>>> I have a few questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and >>>>>>>> Flink's >>>>>>>>>>>>>>> two-phase >>>>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once >>>>>>>> semantics? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the >>>>>>>>>>>>>>> primary key >>>>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate >>>>>>>> more on >>>>>>>>>>>>>>> that? >>>>>>>>>>>>>>> Why it is not the default behavior but a workaround? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv >>>>>>>> decq12y...@gmail.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for driving this. >>>>>>>>>>>>>>>> The content is very detailed, it is recommended to add a >>>>>>>> section on >>>>>>>>>>>>>>>> Test >>>>>>>>>>>>>>>> Plan for more completeness. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Previously, we had some discussions about contributing >>>>>>>> Flink Doris >>>>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further >>>>>>>> promote >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> work. >>>>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP >>>>>>>> discussion and >>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>> more valuable opinions and suggestions. >>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> >>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>>> di.wu >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As discussed in the previous email [1], about >>>>>>>> contributing the >>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>> Doris Connector to the Flink community. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time >>>>>>>> analytical >>>>>>>>>>>>>>>>>> database >>>>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink >>>>>>>> is used for >>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris, >>>>>>>> Flink Doris >>>>>>>>>>>>>>>>>> Connector >>>>>>>>>>>>>>>>>> is an effective tool. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector >>>>>>>> to the Flink >>>>>>>>>>>>>>>>>> community will further expand the Flink Connectors >>>>>>>> ecosystem. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So I would like to start an official discussion >>>>>>>> FLIP-399: Flink >>>>>>>>>>>>>>>>>> Connector Doris[3]. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions >>>>>>>> from the >>>>>>>>>>>>>>>>>> community on the proposal. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> >>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p >>>>>>>>>>>>>>>>>> [2] >>>>>>>>>> >>>>>>>>>> >>>> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/ >>>>>>>>>> >>>>>>>>>>>>>>>>>> [3] >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris >>>>>>>>>> >>>>>>>>>>>>>>>>>> Brs, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> di.wu >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >> >>