Hi Di

Thanks for your reply.

The timeout I'm referring to here is not the commit timeout, but rather the
timeout for a single streamLoad transaction.

Let's say we have set the transaction timeout for StreamLoad to be 10
minutes. Now, imagine there is a Flink job with two subtasks. Due to
significant data skew and backpressure issues, subtask 0 and subtask 1 are
processing at different speeds. Subtask 0 finishes processing this
checkpoint first, while subtask 1 takes another 10 minutes to complete its
processing. At this point, the job's checkpoint is done. However, since
subtask 0 has been waiting for subtask 1 all along, its corresponding
streamLoad transaction closes after more than 10 minutes have passed - by
which time the server has already cleaned up this transaction, leading to a
failed commit.
Therefore, I would like to know if in such situations we can avoid this
problem by setting a longer lifespan for transactions.


Best,
Feng


On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:

> Hi, Feng,
>
> 1. Are you suggesting that when a commit gets stuck, we can interrupt the
> commit request using a timeout parameter? Currently, there is no such
> parameter. In my understanding, in a two-phase commit, checkpoint must be
> enabled, so the commit timeout is essentially the checkpoint timeout.
> Therefore, it seems unnecessary to add an additional parameter here. What
> do you think?
>
> 2. In addition to deleting checkpoints to re-consume data again, the
> Connector also provides an option to ignore commit errors[1]. However, this
> option is only used for error recovery scenarios, such as when a
> transaction is cleared by the server but you want to reuse the upstream
> offset from the checkpoint.
>
> 3. Also, thank you for pointing out the issue with the parameter. It has
> already been addressed[2], but the FLIP changes were overlooked. It has
> been updated.
>
> [1]
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
> [2]
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
>
> Brs
> di.wu
>
>
>
> > 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道:
> >
> > Hi Di,
> >
> > Thank you for the update, as well as quickly implementing corresponding
> > capabilities including filter push down and project push down.
> >
> > Regarding the transaction timeout, I still have some doubts. I would like
> > to confirm if we can control this timeout parameter in the connector,
> such
> > as setting it to 10 minutes or 1 hour.
> > Also, when a transaction is cleared by the server, the commit operation
> of
> > the connector will fail, leading to job failure. In this case, can users
> > only choose to delete the checkpoint and re-consume historical data?
> >
> > There is also a small question regarding the parameters*: *
> > *doris.request.connect.timeout.ms <
> http://doris.request.connect.timeout.ms>*
> > and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms
> >*,
> > can we change them to Duration type and remove the "ms" suffix.?
> > This way, all time parameters can be kept uniform in type as duration.
> >
> >
> > Best,
> > Feng
> >
> > On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
> >
> >> Hi, Feng,
> >> Thank you, that's a great suggestion !
> >>
> >> I have already implemented FilterPushDown and removed that parameter on
> >> DorisDynamicTableSource[1], and also updated FLIP.
> >>
> >> Regarding the mention of [Doris also aborts transactions], it may not
> have
> >> been described accurately. It mainly refers to the automatic expiration
> of
> >> long-running transactions in Doris that have not been committed for a
> >> prolonged period.
> >>
> >> As for two-phase commit, when a commit fails, the checkpoint will also
> >> fail, and the job will be continuously retried.
> >>
> >> [1]
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
> >>
> >> Brs
> >> di.wu
> >>
> >>
> >>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道:
> >>>
> >>> Hi Di
> >>>
> >>> Thank you for initiating this FLIP, +1 for this.
> >>>
> >>> Regarding the option `doris.filter.query` of doris source table
> >>>
> >>> Can we directly implement the FilterPushDown capability of Flink Source
> >>> like Jdbc Source [1] instead of introducing an option?
> >>>
> >>>
> >>> Regarding two-phase commit,
> >>>
> >>>> At the same time, Doris will also abort transactions that have not
> been
> >>> committed for a long time
> >>>
> >>> Can we control the transaction timeout in the connector?
> >>> And control the behavior when timeout occurs, whether to discard by
> >> default
> >>> or trigger job failure?
> >>>
> >>>
> >>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
> >>>
> >>> Best,
> >>> Feng
> >>>
> >>>
> >>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky
> <ferenc.cs...@pm.me.invalid
> >>>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Thanks for driving this, +1 for the FLIP.
> >>>>
> >>>> Best,
> >>>> Ferenc
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy <
> hamdy10...@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>>
> >>>>>
> >>>>> Hello,
> >>>>> Thanks for the proposal, +1 for the FLIP.
> >>>>>
> >>>>> Best Regards
> >>>>> Ahmed Hamdy
> >>>>>
> >>>>>
> >>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
> >>>>>
> >>>>>> Hi, Leonard
> >>>>>> Thank you for your suggestion.
> >>>>>> I referred to other Connectors[1], modified the naming and types of
> >>>>>> relevant parameters[2], and also updated FLIP.
> >>>>>>
> >>>>>> [1]
> >>>>>>
> >>>>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
> >>>>>> [1]
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
> >>>>>>
> >>>>>> Brs,
> >>>>>> di.wu
> >>>>>>
> >>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
> >>>>>>>
> >>>>>>> Thanks wudi for the updating, the FLIP generally looks good to me,
> I
> >>>>>>> only left two minor suggestions:
> >>>>>>>
> >>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s
> >>>> looks
> >>>>>>> strange to me, could we change all time interval related option
> >>>> value type
> >>>>>>> to Duration ?
> >>>>>>>
> >>>>>>> (2) Could you check and improve all config options like
> >>>>>>> `doris.exec.mem.limit` to make them to follow flink config option
> >>>> naming
> >>>>>>> and value type?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Leonard
> >>>>>>>
> >>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
> >>>>>>>>>
> >>>>>>>>> Hi Di,
> >>>>>>>>>
> >>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to
> >>>> know
> >>>>>>>>> your
> >>>>>>>>> thoughts about the following questions:
> >>>>>>>>>
> >>>>>>>>> 1. According to your clarification of the exactly-once, thanks
> >>>> for it
> >>>>>>>>> BTW,
> >>>>>>>>> no PreCommitTopology is required. Does it make sense to let
> >>>>>>>>> DorisSink[1]
> >>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
> >>>>>>>>> deprecated[2] before turning the Doris connector into a Flink
> >>>>>>>>> connector?
> >>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a
> >>>> data
> >>>>>>>>> pipeline
> >>>>>>>>> to support further e.g. ad-hoc query or cube with feasible
> >>>>>>>>> pre-aggregation.
> >>>>>>>>> Just out of curiosity, would you like to share some real use
> >>>> cases that
> >>>>>>>>> will use OLAP engines as the source of a streaming data
> >>>> pipeline? Or it
> >>>>>>>>> will only be used as the source for the batch?
> >>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken.
> >>>> Would you
> >>>>>>>>> like
> >>>>>>>>> to test the source in E2E too?
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
> >>>>>>
> >>>>>>>>> [2]
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> >>>>>>
> >>>>>>>>> [3]
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
> >>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>> Jing
> >>>>>>>>>
> >>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi, Jeyhun Karimov.
> >>>>>>>>>> Thanks for your question.
> >>>>>>>>>>
> >>>>>>>>>> - How to ensure Exactly-Once?
> >>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger
> >>>> the
> >>>>>>>>>> precommit api of StreamLoad to complete the persistence of
> >>>> data in
> >>>>>>>>>> Doris
> >>>>>>>>>> (the data will not be visible at this time), and will also
> >>>> pass this
> >>>>>>>>>> TxnID
> >>>>>>>>>> to the Committer.
> >>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the
> >>>> Committer
> >>>>>>>>>> will
> >>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete
> >>>> the
> >>>>>>>>>> visibility of the transaction.
> >>>>>>>>>> 3. When the task is restarted, the Txn with successful
> >>>> precommit and
> >>>>>>>>>> failed commit will be aborted based on the label-prefix, and
> >>>> Doris'
> >>>>>>>>>> abort
> >>>>>>>>>> API will be called. (At the same time, Doris will also abort
> >>>>>>>>>> transactions
> >>>>>>>>>> that have not been committed for a long time)
> >>>>>>>>>>
> >>>>>>>>>> ps: At the same time, this part of the content has been
> >>>> updated in
> >>>>>>>>>> FLIP
> >>>>>>>>>>
> >>>>>>>>>> - Because the default table model in Doris is Duplicate (
> >>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which
> >>>> does not
> >>>>>>>>>> have a primary key, batch writing may cause data duplication,
> >>>> but
> >>>>>>>>>> UNIQ The
> >>>>>>>>>> model has a primary key, which ensures the idempotence of
> >>>> writing,
> >>>>>>>>>> thus
> >>>>>>>>>> achieving Exactly-Once
> >>>>>>>>>>
> >>>>>>>>>> Brs,
> >>>>>>>>>> di.wu
> >>>>>>>>>>
> >>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the proposal. +1 for the FLIP.
> >>>>>>>>>>> I have a few questions:
> >>>>>>>>>>>
> >>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and
> >>>> Flink's
> >>>>>>>>>>> two-phase
> >>>>>>>>>>> commit) combination will ensure the e2e exactly-once
> >>>> semantics?
> >>>>>>>>>>>
> >>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the
> >>>>>>>>>>> primary key
> >>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate
> >>>> more on
> >>>>>>>>>>> that?
> >>>>>>>>>>> Why it is not the default behavior but a workaround?
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Jeyhun
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv
> >>>> decq12y...@gmail.com
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks for driving this.
> >>>>>>>>>>>> The content is very detailed, it is recommended to add a
> >>>> section on
> >>>>>>>>>>>> Test
> >>>>>>>>>>>> Plan for more completeness.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Previously, we had some discussions about contributing
> >>>> Flink Doris
> >>>>>>>>>>>>> Connector to the Flink community [1]. I want to further
> >>>> promote
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>> work.
> >>>>>>>>>>>>> I hope everyone will help participate in this FLIP
> >>>> discussion and
> >>>>>>>>>>>>> provide
> >>>>>>>>>>>>> more valuable opinions and suggestions.
> >>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Brs,
> >>>>>>>>>>>>> di.wu
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> As discussed in the previous email [1], about
> >>>> contributing the
> >>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>> Doris Connector to the Flink community.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time
> >>>> analytical
> >>>>>>>>>>>>>> database
> >>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink
> >>>> is used for
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris,
> >>>> Flink Doris
> >>>>>>>>>>>>>> Connector
> >>>>>>>>>>>>>> is an effective tool.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector
> >>>> to the Flink
> >>>>>>>>>>>>>> community will further expand the Flink Connectors
> >>>> ecosystem.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So I would like to start an official discussion
> >>>> FLIP-399: Flink
> >>>>>>>>>>>>>> Connector Doris[3].
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions
> >>>> from the
> >>>>>>>>>>>>>> community on the proposal.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>
> >>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
> >>>>>>>>>>>>>> [2]
> >>>>>>
> >>>>>>
> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
> >>>>>>
> >>>>>>>>>>>>>> [3]
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
> >>>>>>
> >>>>>>>>>>>>>> Brs,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> di.wu
> >>>>
> >>
> >>
> >
>
>

Reply via email to