Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread Arvid Heise
Looks good: +1 (binding)

On Tue, Jun 29, 2021 at 5:06 AM 刘建刚  wrote:

> +1 (binding)
>
> Best
> liujiangang
>
> Piotr Nowojski  于2021年6月29日周二 上午2:05写道:
>
> > +1 (binding)
> >
> > Piotrek
> >
> > pon., 28 cze 2021 o 12:48 Dawid Wysakowicz 
> > napisał(a):
> >
> > > +1 (binding)
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 28/06/2021 10:45, Yun Gao wrote:
> > > > Hi all,
> > > >
> > > > For FLIP-147[1] which targets at supports checkpoints after tasks
> > > finished and modify operator
> > > > API and implementation to ensures the commit of last piece of data,
> > > since after the last vote
> > > > we have more discussions[2][3] and a few updates, including changes
> to
> > > PublicEvolving API,
> > > > I'd like to have another VOTE on the current state of the FLIP.
> > > >
> > > > The vote will last at least 72 hours (Jul 1st), following the
> consensus
> > > > voting process.
> > > >
> > > > thanks,
> > > >  Yun
> > > >
> > > >
> > > > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> > > > [2]
> > >
> >
> https://lists.apache.org/thread.html/r400da9898ff66fd613c25efea15de440a86f14758ceeae4950ea25cf%40%3Cdev.flink.apache.org
> > > > [3]
> > >
> >
> https://lists.apache.org/thread.html/r3953df796ef5ac67d5be9f2251a95ad72efbca31f1d1555d13e71197%40%3Cdev.flink.apache.org%3E
> > >
> > >
> >
>


[jira] [Created] (FLINK-23201) The check on alignmentDurationNanos seems to be too strict

2021-07-01 Thread Jun Qin (Jira)
Jun Qin created FLINK-23201:
---

 Summary: The check on alignmentDurationNanos seems to be too strict
 Key: FLINK-23201
 URL: https://issues.apache.org/jira/browse/FLINK-23201
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.2
Reporter: Jun Qin


The check on alignmentDurationNanos seems to be too strict at the line:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java#L74

This may cause a job fail when doing stop-with-savepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23202) RpcService should fail result futures if messages could not be sent

2021-07-01 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-23202:
-

 Summary: RpcService should fail result futures if messages could 
not be sent
 Key: FLINK-23202
 URL: https://issues.apache.org/jira/browse/FLINK-23202
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.4, 1.13.1
Reporter: Till Rohrmann
 Fix For: 1.14.0


The {{RpcService}} should fail result futures if messages could not be sent. 
This would speed up the failure detection mechanism because it would not rely 
on the timeout. One way to achieve this could be to listen to the dead letters 
and then sending a {{Failure}} message back to the sender.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
The analysis of Gen is correct. Flink currently uses its heartbeat as the
primary means to detect dead TaskManagers. This means that Flink will take
at least `heartbeat.timeout` time before the system recovers. Even if the
cancellation happens fast (e.g. by having configured a low
akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
TaskManager until it is marked as dead and its slots are released (unless
the ResourceManager does not get a signal from the underlying resource
management system that a container/pod has died). One way to improve the
situation is to introduce logic which can react to a ConnectionException
and then black lists or releases a TaskManager, for example. This is
currently not implemented in Flink, though.

Concerning the cancellation operation: Flink currently does not listen to
the dead letters of Akka. This means that the `akka.ask.timeout` is the
primary means to fail the future result of a rpc which could not be sent.
This is also an improvement we should add to Flink's RpcService. I've
created a JIRA issue for this problem [1].

[1] https://issues.apache.org/jira/browse/FLINK-23202

Cheers,
Till

On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>
>> I'm also wondering here.
>>
>> In my opinion, it's because the JM can not confirm whether the TM is lost
>> or it's a temporary network trouble and will recover soon, since I can see
>> in the log that akka has got a Connection refused but JM still sends a
>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>> I'm not sure if it's indeed designed like this.
>>
>> I would really appreciate it if anyone who knows more details could
>> answer. Thanks.
>>
>


Improvements of left state in TemporalRowTimeJoinOperator

2021-07-01 Thread Tony Wei
Hi Experts,

Recently, I was learning how temporal table join works in Flink via reading
the source code of
TemporalRowTimeJoinOperator. and I found these comments in the source code:

/**
>  * Mapping from artificial row index (generated by `nextLeftIndex`)
> into the left side `Row`. We
>  * can not use List to accumulate Rows, because we need efficient
> deletes of the oldest rows.
>  *
>  * TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's
> timestamp, to avoid full
>  * map traversals (if we have lots of rows on the state that exceed
> `currentWatermark`).
>  */
> private transient MapState leftState;
>

AFAIK that currently Flink hasn't supported such a complex map state,
OrderedMultiMap, so
I tried to implement a similar one [1] that meets the requirement via
existing map state but having
some space overhead. And I need some feedback from you about this
implementation.

Before explaining the overhead and trade-off between current implementation
and min, let me try to
give a brief introduction of my implementation.

First, I implemented a min-heap state so that I can use it to extract the
earliest row time of left rows.
based on this heap, I implemented a data structure similar to adjacency
list, that I can use it to
simulate a MapState> state and archive putting a new value
without deserializing the
whole list of values that actual MapState> state will do.

Regarding to time complexity and space complexity, let's assume these
conditions:

   1. The total number of all left rows buffered in state is "N".
   2. The distinct number of row times among these buffered rows is "R".
   3. The number of emitted results is approximately "K" each time.
   4. The distinct number of row times among these emitted results is "P".

current
implementation my
implementation
"processElement1"
time complexity O(1) O(log R)
"extract left rows to emit"
time complexity O(N) O(K + PlogR)
space complexity N + 1
(1 for nextLeftIndex) 2N + 2R + 2

>From this table, we know that the space overhead is more than two times of
current implementation,
but the benefit of "extract" time complexity is not always significant due
to the fact that it depends on
many conditions.

Please let me know what you think about such an implementation. Is it
worthy or not?
Besides, I'm not sure if there is already a similar effort on this. For
example, support OrderedMultiMap
or OrderedMap state for general purposes. It will be great if you can point
those JIRA issues to me.
Any feedback is welcome. Thank you in advance.

[1]
https://github.com/tony810430/flink/commit/0d5d9d70df0e85b0ac161ffa89c11249a0b3db2a

best regards,


[jira] [Created] (FLINK-23203) Program cannot parse the parameter value with special characters

2021-07-01 Thread Jacob.Q.Cao (Jira)
Jacob.Q.Cao created FLINK-23203:
---

 Summary: Program cannot parse the parameter value with special 
characters
 Key: FLINK-23203
 URL: https://issues.apache.org/jira/browse/FLINK-23203
 Project: Flink
  Issue Type: Bug
Reporter: Jacob.Q.Cao


*When I start Flink Job with a shell command, the program cannot parse the 
parameter value with {color:#FF}special characters{color}*

 
h3. _Job Shell Command:_

./bin/flink run-application -t yarn-application 
-Dyarn.application.name="Test_Flink_Job" -Dtaskmanager.numberOfTaskSlots=1 
-Djobmanager.memory.process.size=10240m 
-Dtaskmanager.memory.process.size=10240m -c com.jacob.main 
/opt/app/Flink/JAR/test.jar *{color:#FF}--test 'test#123'{color}*

 

Just like the above command, the test parameter value should be "test#123" 
instead of "test"

 

I have used quotation marks, backslashes, etc. to escape the "#", but it still 
has no effect

 

!test.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23204) Provide StateBackends access to MailboxExecutor

2021-07-01 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-23204:
-

 Summary: Provide StateBackends access to MailboxExecutor
 Key: FLINK-23204
 URL: https://issues.apache.org/jira/browse/FLINK-23204
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Roman Khachatryan
 Fix For: 1.14.0


StateBackends are assumed to be not-thread-safe and accessed from the task 
thread only.

In ChangelogStateBackend, there are (more) async operations. In addition to the 
usual methods, task thread is needed for:
 * DFS writer: collect so far uploaded changes; handle upload results after 
completion
 * ChangelogKeyedStateBackend: combining state handles upon upload completion 
by writer
 * ChangelogKeyedStateBackend: materialization - take snapshot (sync phase); 
handle results of the async phase 

Direct synchronization can be used instead, but executing ^^^ by the Task 
thread would simpilfy the code (and ilkely improve performance).

The only way to do this is via MailboxExecutor (because task thread runs mail 
actions in a loop until shutdown).

 

However, it is currently created in StreamTask and classes reside in 
flink-streaming-java. So one subtask is to change creation/lifecycle and move 
the classes. The location is flink-core (at least for interfaces) and 
flink-runtime/flink-core (for implementations).

 

---

Another subtask is to actually expose it to state backends (can be extracted to 
a separate task).

StateBackend.createKeyedStateBackend already has Environment/TaskStateManager 
argument which can be used.

However, Environment
 # is available to the user (via getContainingTask)
 # has too wide scope (e.g. InputGates not needed in state backends)
 # has too many responsibilities - also true for TaskStateManager which has 
e.g. reportIncompleteTaskStateSnapshots

Probably, there is a better way to expose it.

 

---

Note that MailboxExecutor will likely be used in future in other places like 
ProcessFunction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Feedback Collection Jira Bot

2021-07-01 Thread Stephan Ewen
It is true that the bot surfaces problems that are there (not enough
committer attention sometimes), but it also "rubs salt in the wound" of
contributors, and that is tricky.

We can try it out with the extended periods (although I think that in
reality we probably need even longer periods) and see how it goes.

One thing I would suggest is to never let the bot unassign issues. It just
strikes me as very cold and respectless to be unassigned by a bot from an
issue in which I invested time and energy. (The committers don't even take
the time to talk to me and explain why the contribution will not go
forward).
Unassignment should come from another person, possibly in response to a
ping from the bot. I think that makes a big difference in contributor
treatment.



On Wed, Jun 30, 2021 at 12:30 PM Till Rohrmann  wrote:

> I agree that we shouldn't discourage contributions.
>
> For me the main idea of the bot is not to clean up the JIRA but to improve
> our communication and expectation management with the community. There are
> many things we could do but for a lot of things we don't have the time and
> capacity. Then to say at some point that we won't do something is just
> being honest. This also shows when looking at the JIRA numbers of the
> merged commits. We very rarely resolve tickets which are older than x days
> and if we do, then we usually create a new ticket for the problem.
>
> The fact that we see some tickets with available pull requests go stale is
> the symptom that we don't value them to be important enough or
> allocate enough time for external contributions imo. Otherwise, they would
> have gotten the required attention and been merged. In such a case, raising
> awareness by pinging the watchers of the respective ticket is probably
> better than silently ignoring the PR. Also adding labels to filter for
> these PRs should help to get them the required attention. But also here, it
> happens very rarely that we actually merge a PR that is older than y days.
> Ideally we avoid this situation altogether by only assigning contributors
> to tickets for which a committer has review capacity. However, this does
> not seem to always work.
>
> In some sense, the JIRA bot shows us the things, which fall through the
> cracks, more explicitly (which is probably not different than before). Of
> course we should try to find the time periods for when to ping or
> de-prioritize tickets that work best for the community.
>
> +1 for the proposed changes (extended time periods, "Not a Priority",
> default priority and fixVersion).
>
> @Piotr, I think we have the priorities defined here [1]. Maybe it is enough
> to share the link so that everyone can check whether her assumptions are
> correct.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process
>
> Cheers,
> Till
>
> On Wed, Jun 30, 2021 at 10:59 AM Piotr Nowojski 
> wrote:
>
> > > * Introduce "Not a Priority" priority and stop closing tickets.
> >
> > +1 for this one (I also like the name you proposed for this Konstantin)
> >
> > I also have no objections to other proposals that you summarised. Just a
> > remark, that independently of this discussion we might want to revisit or
> > reconfirm the priorities and their definition/interpretation across all
> > contributors.
> >
> > Best,
> > Piotrek
> >
> > śr., 30 cze 2021 o 10:15 Konstantin Knauf 
> napisał(a):
> >
> > > Hi everyone,
> > >
> > > Thank you for the additional comments and suggestions.
> > >
> > > @Stephan, Kurt: I agree that we shouldn't discourage or dishearten
> > > contributors, and probably 14 days until a ticket becomes
> > "stale-assigned"
> > > are too few. That's why I've already proposed to increase that to 30
> > days.
> > > Similarly the times for Major/Critical tickets can be increased. From
> my
> > > perspective, the root causes are the following:
> > >
> > > * tickets are opened with the wrong priority (see
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process#FlinkJiraProcess-TicketsPriorities
> > > ).
> > > Here it might help to change the default priority.
> > > * committers don't have the time to review tickets or don't bring
> > community
> > > contributions to a resolution. The Jira bot makes this fact more
> visible.
> > > Without the Jira Bot no external contributor would get more attention,
> > and
> > > no external contribution would be merged faster. Ideally, it'd be the
> > > opposite and committers would actively monitor tickets with labels
> > > "stale-assigned" and "pull-request-available" in order to review those
> > with
> > > priority. That's also why I am not a fan of excluding tickets with
> > > "pull-request-available" from the bot. The bot can help to make these
> > > tickets visible to reviewers.
> > >
> > > @Jing Zhang: That's a problem. We should try to change the permissions
> > > accordingly or need to find a different solution.
> > >
> > > @Piotr, Kurt: Instead of closing tickets, we could introduc

[VOTE] Release 1.13.2, release candidate #1

2021-07-01 Thread Yun Tang
Hi everyone,
Please review and vote on the release candidate #1 for the version 1.13.2, as 
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be 
deployed to dist.apache.org [2], which are signed with the key with fingerprint 
78A306590F1081CC6794DC7F62DAD618E07CF996 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.13.2-rc1" [5],
* website pull request listing the new release and adding announcement blog 
post [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Best,
Yun Tang

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.2-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1429/
[5] https://github.com/apache/flink/releases/tag/release-1.13.2-rc1
[6] https://github.com/apache/flink-web/pull/453



[DISCUSS] FLIP-173: Support DAG of algorithms (Flink ML)

2021-07-01 Thread Dong Lin
Hi all,

Zhipeng, Fan (cc'ed) and I are opening this thread to discuss two different
designs to extend Flink ML API to support more use-cases, e.g. expressing a
DAG of preprocessing and training logics. These two designs have been
documented in FLIP-173

。

We have different opinions on the usability and the ease-of-understanding
of the proposed APIs. It will be really useful to have comments of those
designs from the open source community and to learn your preferences.

To facilitate the discussion, we have summarized our design principles and
opinions in this Google doc
.
Code snippets for a few example use-cases are also provided in this doc to
demonstrate the difference between these two solutions.

This Flink ML API is super important to the future of Flink ML library.
Please feel free to reply to this email thread or comment in the Google doc
directly.

Thank you!
Dong, Zhipeng, Fan


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Yang Wang
Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
should get the container
completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
is 8 seconds by default.
And Flink ResourceManager will release the dead TaskManager container once
received the completion event.
As a result, Flink will not deploy tasks onto the dead TaskManagers.


I think most of the time cost in Phase 1 might be cancelling the tasks on
the dead TaskManagers.


Best,
Yang


Till Rohrmann  于2021年7月1日周四 下午4:49写道:

> The analysis of Gen is correct. Flink currently uses its heartbeat as the
> primary means to detect dead TaskManagers. This means that Flink will take
> at least `heartbeat.timeout` time before the system recovers. Even if the
> cancellation happens fast (e.g. by having configured a low
> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
> TaskManager until it is marked as dead and its slots are released (unless
> the ResourceManager does not get a signal from the underlying resource
> management system that a container/pod has died). One way to improve the
> situation is to introduce logic which can react to a ConnectionException
> and then black lists or releases a TaskManager, for example. This is
> currently not implemented in Flink, though.
>
> Concerning the cancellation operation: Flink currently does not listen to
> the dead letters of Akka. This means that the `akka.ask.timeout` is the
> primary means to fail the future result of a rpc which could not be sent.
> This is also an improvement we should add to Flink's RpcService. I've
> created a JIRA issue for this problem [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-23202
>
> Cheers,
> Till
>
> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>
>> Thanks Gen! cc flink-dev to collect more inputs.
>>
>> Best
>> Lu
>>
>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>
>>> I'm also wondering here.
>>>
>>> In my opinion, it's because the JM can not confirm whether the TM is
>>> lost or it's a temporary network trouble and will recover soon, since I can
>>> see in the log that akka has got a Connection refused but JM still sends a
>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>> I'm not sure if it's indeed designed like this.
>>>
>>> I would really appreciate it if anyone who knows more details could
>>> answer. Thanks.
>>>
>>


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Thomas Weise
+1 (binding)


On Thu, Jul 1, 2021 at 8:13 AM Arvid Heise  wrote:

> +1 (binding)
>
> Thank you and Thomas for driving this
>
> On Thu, Jul 1, 2021 at 7:50 AM 蒋晓峰  wrote:
>
> > Hi everyone,
> >
> >
> >
> >
> > Thanks for all the feedback to Hybrid Source so far. Based on the
> > discussion[1] we seem to have consensus, so I would like to start a vote
> on
> > FLIP-150 for which the FLIP has also been updated[2].
> >
> >
> >
> >
> > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT) unless
> > there is an objection or insufficient votes.
> >
> >
> >
> >
> > Thanks,
> >
> > Nicholas Jiang
> >
> >
> >
> >
> > [1]
> >
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> >
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>


[jira] [Created] (FLINK-23205) Relax Time Intervals of Flink Jira Bot

2021-07-01 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-23205:


 Summary: Relax Time Intervals of Flink Jira Bot
 Key: FLINK-23205
 URL: https://issues.apache.org/jira/browse/FLINK-23205
 Project: Flink
  Issue Type: Improvement
Reporter: Konstantin Knauf
Assignee: Konstantin Knauf


We'd like to relax the time intervals after which the Flink Jira Bot considers 
a ticket stale.

* stale-assigned only after 30 days (instead of 14 days)
* stale-critical only after 14 days (instead of 7 days)
* stale-major only after 60 days (instead of 30 days)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23206) Flink Jira Bot Moves Tickets to "Not a Priority" Instead of Closing Them

2021-07-01 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-23206:


 Summary: Flink Jira Bot Moves Tickets to "Not a Priority" Instead 
of Closing Them
 Key: FLINK-23206
 URL: https://issues.apache.org/jira/browse/FLINK-23206
 Project: Flink
  Issue Type: Improvement
Reporter: Konstantin Knauf
Assignee: Konstantin Knauf






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23207) Flink Jira Bot to Ignore Tickets with fixVersion set

2021-07-01 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-23207:


 Summary: Flink Jira Bot to Ignore Tickets with fixVersion set
 Key: FLINK-23207
 URL: https://issues.apache.org/jira/browse/FLINK-23207
 Project: Flink
  Issue Type: Improvement
Reporter: Konstantin Knauf
Assignee: Konstantin Knauf






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread Jark Wu
Thanks Jing for bringing up this topic,

The emit strategy configs are annotated as Experiential and not public on
documentations.
However, I see this is a very useful feature which many users are looking
for.
I have posted these configs for many questions like "how to handle late
events in SQL".
Thus, I think it's time to make the configuration public and explicitly
document it. In the long
term, we would like to propose an EMIT syntax for SQL, but until then we
can get more
valuable feedback from users when they are using the configs.

Regarding the exposed configuration, I prefer proposal (2).
But it would be better not to expose `table.exec.emit.late-fire.enabled` on
docs and we can
remove it in the next version.

Best,
Jark


On Tue, 29 Jun 2021 at 11:09, JING ZHANG  wrote:

> When WindowAggregate works upon Changelog which contains update messages,
> UPDATE BEFORE message may be dropped as a late message. [1]
>
> In order to handle late UB message, user needs to set *all* the following
> 3 parameters:
>
> (1) enable late fire by setting
>
> table.exec.emit.late-fire.enabled : true
>
> (2) set per record emit behavior for late records by setting
>
> table.exec.emit.late-fire.delay : 0 s
>
> (3) keep window state for extra time after window is fired by setting
>
> table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h
>
>
> The solution has two disadvantages:
>
> (1) Users may not realize that UB messages may be dropped as a late event,
> so they will not set related parameters.
>
> (2) When users look for a solution to solve the dropped UB messages
> problem, the current solution is a bit inconvenient for them because they
> need to set all the 3 parameters. Besides, some configurations have overlap
> ability.
>
>
> Now there are two proposals to simplify the 3 parameters a little.
>
> (1) Users only need set table.exec.emit.allow-lateness (just like the
> behavior on Datastream, user only need set allow-lateness), framework could
> atom set `table.exec.emit.late-fire.enabled` to true and set
> `table.exec.emit.late-fire.delay` to 0s.
>
> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> and `table.exec.emit.late-fire.enabled`.
>
>
> (2) Users need set `table.exec.emit.late-fire.enabled` to true and set
> `table.exec.state.ttl`, framework  could atom set
> `table.exec.emit.late-fire.delay` to 0s.
>
> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> and `table.exec.emit.allow-lateness `.
>
>
> Please let me know what you think about the issue.
>
> Thank you.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22781
>
>
> Best regards,
> JING ZHANG
>
>
>
>


[jira] [Created] (FLINK-23208) Late processing timers need to wait 1ms at least to be fired

2021-07-01 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-23208:
--

 Summary: Late processing timers need to wait 1ms at least to be 
fired
 Key: FLINK-23208
 URL: https://issues.apache.org/jira/browse/FLINK-23208
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.0
Reporter: Jiayi Liao


The problem is from the codes below:

```
public static long getProcessingTimeDelay(long processingTimestamp, long 
currentTimestamp) {

// delay the firing of the timer by 1 ms to align the semantics with 
watermark. A watermark
// T says we won't see elements in the future with a timestamp smaller 
or equal to T.
// With processing time, we therefore need to delay firing the timer by 
one ms.
return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
}
```

Assuming a Flink job creates 1 timer per millionseconds, and is able to consume 
1 timer/ms. Here is what will happen: 

* Timestmap1(1st ms): timer1 is registered and will be triggered on Timestamp2. 
* Timestamp2(2nd ms): timer2 is registered and timer1 is triggered
* Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after this, 
{{InternalTimerServiceImpl}} registers next timer, which is timer2, and timer2 
will be triggered on Timestamp4(wait 1ms at least)
* Timestamp4(4th ms): timer4 is registered and timer2 is triggered
* Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after this, 
{{InternalTimerServiceImpl}} registers next timer, which is timer3, and timer3 
will be triggered on Timestamp6(wait 1ms at least)

As we can see here, the ability of the Flink job is consuming 1 timer/ms, but 
it's actually able to consume 0.5 timer/ms. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23209) Timeout heartbeat if the heartbeat target is no longer reachable

2021-07-01 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-23209:
-

 Summary: Timeout heartbeat if the heartbeat target is no longer 
reachable
 Key: FLINK-23209
 URL: https://issues.apache.org/jira/browse/FLINK-23209
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.12.4, 1.13.1
Reporter: Till Rohrmann
 Fix For: 1.14.0, 1.12.5, 1.13.2


With FLINK-23202 it should now be possible to see when a remote RPC endpoint is 
no longer reachable. This can be used by the {{HeartbeatManager}} to mark an 
heartbeat target as no longer reachable. That way, it is possible for Flink to 
react faster to losses of components w/o having to wait for the heartbeat 
timeout to expire. This will result in faster recoveries (e.g. if a 
{{TaskExecutor}} dies).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
A quick addition, I think with FLINK-23202 it should now also be possible
to improve the heartbeat mechanism in the general case. We can leverage the
unreachability exception thrown if a remote target is no longer reachable
to mark an heartbeat target as no longer reachable [1]. This can then be
considered as if the heartbeat timeout has been triggered. That way we
should detect lost TaskExecutors as fast as our heartbeat interval is.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
> should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
> is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container once
> received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks on
> the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as the
>> primary means to detect dead TaskManagers. This means that Flink will take
>> at least `heartbeat.timeout` time before the system recovers. Even if the
>> cancellation happens fast (e.g. by having configured a low
>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>> TaskManager until it is marked as dead and its slots are released (unless
>> the ResourceManager does not get a signal from the underlying resource
>> management system that a container/pod has died). One way to improve the
>> situation is to introduce logic which can react to a ConnectionException
>> and then black lists or releases a TaskManager, for example. This is
>> currently not implemented in Flink, though.
>>
>> Concerning the cancellation operation: Flink currently does not listen to
>> the dead letters of Akka. This means that the `akka.ask.timeout` is the
>> primary means to fail the future result of a rpc which could not be sent.
>> This is also an improvement we should add to Flink's RpcService. I've
>> created a JIRA issue for this problem [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>
>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>>
 I'm also wondering here.

 In my opinion, it's because the JM can not confirm whether the TM is
 lost or it's a temporary network trouble and will recover soon, since I can
 see in the log that akka has got a Connection refused but JM still sends a
 heartbeat request to the lost TM until it reaches heartbeat timeout. But
 I'm not sure if it's indeed designed like this.

 I would really appreciate it if anyone who knows more details could
 answer. Thanks.

>>>


[jira] [Created] (FLINK-23210) Running HA per-job cluster (hashmap, sync) end-to-end test failed on azure

2021-07-01 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-23210:


 Summary: Running HA per-job cluster (hashmap, sync) end-to-end 
test failed on azure
 Key: FLINK-23210
 URL: https://issues.apache.org/jira/browse/FLINK-23210
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.1
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=19776&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529&l=11631

{code}
Jul 01 12:54:55 
Jul 01 12:54:55 
==
Jul 01 12:54:55 Running 'Running HA per-job cluster (hashmap, sync) end-to-end 
test'
Jul 01 12:54:55 
==
Jul 01 12:54:55 TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-55217860509
Jul 01 12:54:55 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.13-SNAPSHOT-bin/flink-1.13-SNAPSHOT
Jul 01 12:54:55 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.13-SNAPSHOT-bin/flink-1.13-SNAPSHOT
Jul 01 12:54:55 Starting zookeeper daemon on host fv-az91-551.
Jul 01 12:54:55 Running on HA mode: parallelism=4, backend=hashmap, 
asyncSnapshots=false, incremSnapshots=false and zk=3.5.
Jul 01 12:54:56 Starting standalonejob daemon on host fv-az91-551.
Jul 01 12:54:56 Start 1 more task managers
Jul 01 12:54:58 Starting taskexecutor daemon on host fv-az91-551.
Jul 01 12:55:02 Job () is not yet running.
Jul 01 12:55:07 Job () is running.
Jul 01 12:55:07 Running JM watchdog @ 351161
Jul 01 12:55:07 Running TM watchdog @ 351162
Jul 01 12:55:07 Waiting for text Completed checkpoint [1-9]* for job 
 to appear 2 of times in logs...
Jul 01 12:55:08 Killed JM @ 350374
Jul 01 12:55:08 Waiting for text Completed checkpoint [1-9]* for job 
 to appear 2 of times in logs...
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-1*.log: No 
such file or directory
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-1*.log: No 
such file or directory
Jul 01 12:55:09 Killed TM @ 350614
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-1*.log: No 
such file or directory
Jul 01 12:55:10 Starting standalonejob daemon on host fv-az91-551.
Jul 01 12:55:55 Killed TM @ 351701
Jul 01 12:55:55 Killed JM @ 351836
Jul 01 12:55:55 Waiting for text Completed checkpoint [1-9]* for job 
 to appear 2 of times in logs...
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-2*.log: No 
such file or directory
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-2*.log: No 
such file or directory
Jul 01 12:55:57 Starting standalonejob daemon on host fv-az91-551.
grep: /home/vsts/work/_temp/debug_files/flink-logs/*standalonejob-2*.log: No 
such file or directory
Jul 01 12:56:44 Killed TM @ 353554
Jul 01 12:56:56 Killed TM @ 355735
Jul 01 13:06:00 A timeout occurred waiting for Completed checkpoint [1-9]* for 
job  to appear 2 of times in logs.
Jul 01 13:06:00 Stopping job timeout watchdog (with pid=349933)
Jul 01 13:06:00 Killing JM watchdog @ 351161
Jul 01 13:06:00 Killing TM watchdog @ 351162
Jul 01 13:06:00 [FAIL] Test script contains errors.
Jul 01 13:06:00 Checking of logs skipped.
Jul 01 13:06:00 
Jul 01 13:06:00 [FAIL] 'Running HA per-job cluster (hashmap, sync) end-to-end 
test' failed after 11 minutes and 5 seconds! Test exited with exit code 1
Jul 01 13:06:00 

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Steven Wu
+1 (non-binding)

On Thu, Jul 1, 2021 at 4:59 AM Thomas Weise  wrote:

> +1 (binding)
>
>
> On Thu, Jul 1, 2021 at 8:13 AM Arvid Heise  wrote:
>
> > +1 (binding)
> >
> > Thank you and Thomas for driving this
> >
> > On Thu, Jul 1, 2021 at 7:50 AM 蒋晓峰  wrote:
> >
> > > Hi everyone,
> > >
> > >
> > >
> > >
> > > Thanks for all the feedback to Hybrid Source so far. Based on the
> > > discussion[1] we seem to have consensus, so I would like to start a
> vote
> > on
> > > FLIP-150 for which the FLIP has also been updated[2].
> > >
> > >
> > >
> > >
> > > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT)
> unless
> > > there is an objection or insufficient votes.
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Nicholas Jiang
> > >
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> > >
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> >
>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation).

Some logs:
```
```


On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:

> A quick addition, I think with FLINK-23202 it should now also be possible
> to improve the heartbeat mechanism in the general case. We can leverage the
> unreachability exception thrown if a remote target is no longer reachable
> to mark an heartbeat target as no longer reachable [1]. This can then be
> considered as if the heartbeat timeout has been triggered. That way we
> should detect lost TaskExecutors as fast as our heartbeat interval is.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>
>> Since you are deploying Flink workloads on Yarn, the Flink
>> ResourceManager should get the container
>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
>> is 8 seconds by default.
>> And Flink ResourceManager will release the dead TaskManager container
>> once received the completion event.
>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>
>>
>> I think most of the time cost in Phase 1 might be cancelling the tasks on
>> the dead TaskManagers.
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>
>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>> the primary means to detect dead TaskManagers. This means that Flink will
>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>> the cancellation happens fast (e.g. by having configured a low
>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>> TaskManager until it is marked as dead and its slots are released (unless
>>> the ResourceManager does not get a signal from the underlying resource
>>> management system that a container/pod has died). One way to improve the
>>> situation is to introduce logic which can react to a ConnectionException
>>> and then black lists or releases a TaskManager, for example. This is
>>> currently not implemented in Flink, though.
>>>
>>> Concerning the cancellation operation: Flink currently does not listen
>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>> primary means to fail the future result of a rpc which could not be sent.
>>> This is also an improvement we should add to Flink's RpcService. I've
>>> created a JIRA issue for this problem [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>>
 Thanks Gen! cc flink-dev to collect more inputs.

 Best
 Lu

 On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:

> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is
> lost or it's a temporary network trouble and will recover soon, since I 
> can
> see in the log that akka has got a Connection refused but JM still sends a
> heartbeat request to the lost TM until it reaches heartbeat timeout. But
> I'm not sure if it's indeed designed like this.
>
> I would really appreciate it if anyone who knows more details could
> answer. Thanks.
>



Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation). Why phase 1 still takes 30s
even though no TM is lost?

Related logs:
```
2021-06-30 00:55:07,463 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
java.lang.RuntimeException: Trigger expected exception at: 1625014507446
2021-06-30 00:55:07,509 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
RESTARTING.
2021-06-30 00:55:37,596 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
RUNNING.
2021-06-30 00:55:38,678 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
all tasks switch from CREATED to RUNNING)
```
Best
Lu


On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation).
>
> Some logs:
> ```
> ```
>
>
> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:
>
>> A quick addition, I think with FLINK-23202 it should now also be possible
>> to improve the heartbeat mechanism in the general case. We can leverage the
>> unreachability exception thrown if a remote target is no longer reachable
>> to mark an heartbeat target as no longer reachable [1]. This can then be
>> considered as if the heartbeat timeout has been triggered. That way we
>> should detect lost TaskExecutors as fast as our heartbeat interval is.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>
>>> Since you are deploying Flink workloads on Yarn, the Flink
>>> ResourceManager should get the container
>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>> which is 8 seconds by default.
>>> And Flink ResourceManager will release the dead TaskManager container
>>> once received the completion event.
>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>
>>>
>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>> on the dead TaskManagers.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>>
 The analysis of Gen is correct. Flink currently uses its heartbeat as
 the primary means to detect dead TaskManagers. This means that Flink will
 take at least `heartbeat.timeout` time before the system recovers. Even if
 the cancellation happens fast (e.g. by having configured a low
 akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
 TaskManager until it is marked as dead and its slots are released (unless
 the ResourceManager does not get a signal from the underlying resource
 management system that a container/pod has died). One way to improve the
 situation is to introduce logic which can react to a ConnectionException
 and then black lists or releases a TaskManager, for example. This is
 currently not implemented in Flink, though.

 Concerning the cancellation operation: Flink currently does not listen
 to the dead letters of Akka. This means that the `akka.ask.timeout` is the
 primary means to fail the future result of a rpc which could not be sent.
 This is also an improvement we should add to Flink's RpcService. I've
 created a JIRA issue for this problem [1].

 [1] https://issues.apache.org/jira/browse/FLINK-23202

 Cheers,
 Till

 On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Another side question, Shall we add metric to cover the complete restarting
time (phase 1 + phase 2)? Current metric jm.restartingTime only covers
phase 1. Thanks!

Best
Lu

On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation). Why phase 1 still takes 30s
> even though no TM is lost?
>
> Related logs:
> ```
> 2021-06-30 00:55:07,463 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
> 2021-06-30 00:55:07,509 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> RESTARTING.
> 2021-06-30 00:55:37,596 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
> RUNNING.
> 2021-06-30 00:55:38,678 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
> all tasks switch from CREATED to RUNNING)
> ```
> Best
> Lu
>
>
> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation).
>>
>> Some logs:
>> ```
>> ```
>>
>>
>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>> wrote:
>>
>>> A quick addition, I think with FLINK-23202 it should now also be
>>> possible to improve the heartbeat mechanism in the general case. We can
>>> leverage the unreachability exception thrown if a remote target is no
>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>> This can then be considered as if the heartbeat timeout has been triggered.
>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>> interval is.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>>
 Since you are deploying Flink workloads on Yarn, the Flink
 ResourceManager should get the container
 completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
 which is 8 seconds by default.
 And Flink ResourceManager will release the dead TaskManager container
 once received the completion event.
 As a result, Flink will not deploy tasks onto the dead TaskManagers.


 I think most of the time cost in Phase 1 might be cancelling the tasks
 on the dead TaskManagers.


 Best,
 Yang


 Till Rohrmann  于2021年7月1日周四 下午4:49写道:

> The analysis of Gen is correct. Flink currently uses its heartbeat as
> the primary means to detect dead TaskManagers. This means that Flink will
> take at least `heartbeat.timeout` time before the system recovers. Even if
> the cancellation happens fast (e.g. by having configured a low
> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
> TaskManager until it is marked as dead and its slots are released (unless
> the ResourceManager does not get a signal from the underlying resource
> management system that a container/pod has died). One way to improve the
> situation is to introduce logic which can react to a ConnectionException
> and then black lists or releases a TaskManager, for example. This is
> currently not implemented in Flink, though.
>
> Concerning the cancellation operation: Flink currently does not listen
> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
> primary means to fail the future result of a rpc which could not be sent.
>>

Re: Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Shawn


+1 (non-binding)
On 2021/07/01 14:32:58 Steven Wu wrote:
> +1 (non-binding)
> 
> On Thu, Jul 1, 2021 at 4:59 AM Thomas Weise  wrote:
> 
> > +1 (binding)
> >
> >
> > On Thu, Jul 1, 2021 at 8:13 AM Arvid Heise  wrote:
> >
> > > +1 (binding)
> > >
> > > Thank you and Thomas for driving this
> > >
> > > On Thu, Jul 1, 2021 at 7:50 AM 蒋晓峰  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > >
> > > >
> > > >
> > > > Thanks for all the feedback to Hybrid Source so far. Based on the
> > > > discussion[1] we seem to have consensus, so I would like to start a
> > vote
> > > on
> > > > FLIP-150 for which the FLIP has also been updated[2].
> > > >
> > > >
> > > >
> > > >
> > > > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT)
> > unless
> > > > there is an objection or insufficient votes.
> > > >
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Nicholas Jiang
> > > >
> > > >
> > > >
> > > >
> > > > [1]
> > > >
> > >
> > https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> > > >
> > > > [2]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> > >
> >
> 


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Elkhan Dadashov
+1 (non-binding)

On 2021/07/01 05:49:44 蒋晓峰 wrote:
> Hi everyone,
> 
> 
> 
> 
> Thanks for all the feedback to Hybrid Source so far. Based on the 
> discussion[1] we seem to have consensus, so I would like to start a vote on 
> FLIP-150 for which the FLIP has also been updated[2].
> 
> 
> 
> 
> The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT) unless 
> there is an objection or insufficient votes.
> 
> 
> 
> 
> Thanks,
> 
> Nicholas Jiang
> 
> 
> 
> 
> [1] 
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> 
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


[jira] [Created] (FLINK-23211) An old interface method is used in this section of [Passing Options Factory to RocksDB].

2021-07-01 Thread Carl (Jira)
Carl created FLINK-23211:


 Summary: An old interface method is used in this section of 
[Passing Options Factory to RocksDB].
 Key: FLINK-23211
 URL: https://issues.apache.org/jira/browse/FLINK-23211
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.13.0
Reporter: Carl
 Attachments: image-2021-07-02-09-19-30-547.png, 
image-2021-07-02-09-20-03-344.png

[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/]

!image-2021-07-02-09-19-30-547.png!

 

*In version 1.13 of Flink, this method has been replaced by the following 
interface:*

 

!image-2021-07-02-09-20-03-344.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread JING ZHANG
+1 (binding)


Arvid Heise  于2021年7月1日周四 下午3:10写道:

> Looks good: +1 (binding)
>
> On Tue, Jun 29, 2021 at 5:06 AM 刘建刚  wrote:
>
> > +1 (binding)
> >
> > Best
> > liujiangang
> >
> > Piotr Nowojski  于2021年6月29日周二 上午2:05写道:
> >
> > > +1 (binding)
> > >
> > > Piotrek
> > >
> > > pon., 28 cze 2021 o 12:48 Dawid Wysakowicz 
> > > napisał(a):
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 28/06/2021 10:45, Yun Gao wrote:
> > > > > Hi all,
> > > > >
> > > > > For FLIP-147[1] which targets at supports checkpoints after tasks
> > > > finished and modify operator
> > > > > API and implementation to ensures the commit of last piece of data,
> > > > since after the last vote
> > > > > we have more discussions[2][3] and a few updates, including changes
> > to
> > > > PublicEvolving API,
> > > > > I'd like to have another VOTE on the current state of the FLIP.
> > > > >
> > > > > The vote will last at least 72 hours (Jul 1st), following the
> > consensus
> > > > > voting process.
> > > > >
> > > > > thanks,
> > > > >  Yun
> > > > >
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> > > > > [2]
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r400da9898ff66fd613c25efea15de440a86f14758ceeae4950ea25cf%40%3Cdev.flink.apache.org
> > > > > [3]
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r3953df796ef5ac67d5be9f2251a95ad72efbca31f1d1555d13e71197%40%3Cdev.flink.apache.org%3E
> > > >
> > > >
> > >
> >
>


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-01 Thread Israel Ekpo
+1 (non-binding)

On Thu, Jul 1, 2021 at 6:45 PM Elkhan Dadashov 
wrote:

> +1 (non-binding)
>
> On 2021/07/01 05:49:44 蒋晓峰 wrote:
> > Hi everyone,
> >
> >
> >
> >
> > Thanks for all the feedback to Hybrid Source so far. Based on the
> discussion[1] we seem to have consensus, so I would like to start a vote on
> FLIP-150 for which the FLIP has also been updated[2].
> >
> >
> >
> >
> > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT) unless
> there is an objection or insufficient votes.
> >
> >
> >
> >
> > Thanks,
> >
> > Nicholas Jiang
> >
> >
> >
> >
> > [1]
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> >
> > [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>


Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread Jark Wu
Sorry, I made a typo above. I mean I prefer proposal (1) that
only needs to set `table.exec.emit.allow-lateness` to handle late events.
`table.exec.emit.late-fire.delay` can be optional which is 0s by default.
`table.exec.state.ttl` will not affect window state anymore, so window state
is still cleaned accurately by watermark.

We don't need to expose `table.exec.emit.late-fire.enabled` on docs and
can remove it in the next version.

Best,
Jark

On Thu, 1 Jul 2021 at 21:20, Jark Wu  wrote:

> Thanks Jing for bringing up this topic,
>
> The emit strategy configs are annotated as Experiential and not public on
> documentations.
> However, I see this is a very useful feature which many users are looking
> for.
> I have posted these configs for many questions like "how to handle late
> events in SQL".
> Thus, I think it's time to make the configuration public and explicitly
> document it. In the long
> term, we would like to propose an EMIT syntax for SQL, but until then we
> can get more
> valuable feedback from users when they are using the configs.
>
> Regarding the exposed configuration, I prefer proposal (2).
> But it would be better not to expose `table.exec.emit.late-fire.enabled`
> on docs and we can
> remove it in the next version.
>
> Best,
> Jark
>
>
> On Tue, 29 Jun 2021 at 11:09, JING ZHANG  wrote:
>
>> When WindowAggregate works upon Changelog which contains update messages,
>> UPDATE BEFORE message may be dropped as a late message. [1]
>>
>> In order to handle late UB message, user needs to set *all* the
>> following 3 parameters:
>>
>> (1) enable late fire by setting
>>
>> table.exec.emit.late-fire.enabled : true
>>
>> (2) set per record emit behavior for late records by setting
>>
>> table.exec.emit.late-fire.delay : 0 s
>>
>> (3) keep window state for extra time after window is fired by setting
>>
>> table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h
>>
>>
>> The solution has two disadvantages:
>>
>> (1) Users may not realize that UB messages may be dropped as a late
>> event, so they will not set related parameters.
>>
>> (2) When users look for a solution to solve the dropped UB messages
>> problem, the current solution is a bit inconvenient for them because they
>> need to set all the 3 parameters. Besides, some configurations have overlap
>> ability.
>>
>>
>> Now there are two proposals to simplify the 3 parameters a little.
>>
>> (1) Users only need set table.exec.emit.allow-lateness (just like the
>> behavior on Datastream, user only need set allow-lateness), framework could
>> atom set `table.exec.emit.late-fire.enabled` to true and set
>> `table.exec.emit.late-fire.delay` to 0s.
>>
>> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
>> and `table.exec.emit.late-fire.enabled`.
>>
>>
>> (2) Users need set `table.exec.emit.late-fire.enabled` to true and set
>> `table.exec.state.ttl`, framework  could atom set
>> `table.exec.emit.late-fire.delay` to 0s.
>>
>> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
>> and `table.exec.emit.allow-lateness `.
>>
>>
>> Please let me know what you think about the issue.
>>
>> Thank you.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22781
>>
>>
>> Best regards,
>> JING ZHANG
>>
>>
>>
>>


Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread 刘建刚
Thanks for the discussion, JING ZHANG. I like the first proposal since it
is simple and consistent with dataStream API. It is helpful to add more
docs about the special late case in WindowAggregate. Also, I expect the
more flexible emit strategies later.

Jark Wu  于2021年7月2日周五 上午10:33写道:

> Sorry, I made a typo above. I mean I prefer proposal (1) that
> only needs to set `table.exec.emit.allow-lateness` to handle late events.
> `table.exec.emit.late-fire.delay` can be optional which is 0s by default.
> `table.exec.state.ttl` will not affect window state anymore, so window
> state
> is still cleaned accurately by watermark.
>
> We don't need to expose `table.exec.emit.late-fire.enabled` on docs and
> can remove it in the next version.
>
> Best,
> Jark
>
> On Thu, 1 Jul 2021 at 21:20, Jark Wu  wrote:
>
> > Thanks Jing for bringing up this topic,
> >
> > The emit strategy configs are annotated as Experiential and not public on
> > documentations.
> > However, I see this is a very useful feature which many users are looking
> > for.
> > I have posted these configs for many questions like "how to handle late
> > events in SQL".
> > Thus, I think it's time to make the configuration public and explicitly
> > document it. In the long
> > term, we would like to propose an EMIT syntax for SQL, but until then we
> > can get more
> > valuable feedback from users when they are using the configs.
> >
> > Regarding the exposed configuration, I prefer proposal (2).
> > But it would be better not to expose `table.exec.emit.late-fire.enabled`
> > on docs and we can
> > remove it in the next version.
> >
> > Best,
> > Jark
> >
> >
> > On Tue, 29 Jun 2021 at 11:09, JING ZHANG  wrote:
> >
> >> When WindowAggregate works upon Changelog which contains update
> messages,
> >> UPDATE BEFORE message may be dropped as a late message. [1]
> >>
> >> In order to handle late UB message, user needs to set *all* the
> >> following 3 parameters:
> >>
> >> (1) enable late fire by setting
> >>
> >> table.exec.emit.late-fire.enabled : true
> >>
> >> (2) set per record emit behavior for late records by setting
> >>
> >> table.exec.emit.late-fire.delay : 0 s
> >>
> >> (3) keep window state for extra time after window is fired by setting
> >>
> >> table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h
> >>
> >>
> >> The solution has two disadvantages:
> >>
> >> (1) Users may not realize that UB messages may be dropped as a late
> >> event, so they will not set related parameters.
> >>
> >> (2) When users look for a solution to solve the dropped UB messages
> >> problem, the current solution is a bit inconvenient for them because
> they
> >> need to set all the 3 parameters. Besides, some configurations have
> overlap
> >> ability.
> >>
> >>
> >> Now there are two proposals to simplify the 3 parameters a little.
> >>
> >> (1) Users only need set table.exec.emit.allow-lateness (just like the
> >> behavior on Datastream, user only need set allow-lateness), framework
> could
> >> atom set `table.exec.emit.late-fire.enabled` to true and set
> >> `table.exec.emit.late-fire.delay` to 0s.
> >>
> >> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> >> and `table.exec.emit.late-fire.enabled`.
> >>
> >>
> >> (2) Users need set `table.exec.emit.late-fire.enabled` to true and set
> >> `table.exec.state.ttl`, framework  could atom set
> >> `table.exec.emit.late-fire.delay` to 0s.
> >>
> >> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> >> and `table.exec.emit.allow-lateness `.
> >>
> >>
> >> Please let me know what you think about the issue.
> >>
> >> Thank you.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-22781
> >>
> >>
> >> Best regards,
> >> JING ZHANG
> >>
> >>
> >>
> >>
>


[jira] [Created] (FLINK-23212) Skip code-wise tests for pure documentation changing PRs

2021-07-01 Thread Xintong Song (Jira)
Xintong Song created FLINK-23212:


 Summary: Skip code-wise tests for pure documentation changing PRs
 Key: FLINK-23212
 URL: https://issues.apache.org/jira/browse/FLINK-23212
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI
Reporter: Xintong Song


It's brought up in [this 
thread|https://lists.apache.org/thread.html/r275493c169a585c9472beaf6796fb650b1493a9bea9d8daf2101c2df%40%3Cdev.flink.apache.org%3E]
 whether we should skip code-wise tests for pure documentation changing PRs, to 
avoid blocking documentation efforts on build instabilities and reduce the 
workload of CI workers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread Leonard Xu
+1,

This may help https://issues.apache.org/jira/browse/FLINK-22764 


Best,
Leonard

> 在 2021年7月2日,10:17,JING ZHANG  写道:
> 
> +1 (binding)
> 
> 
> Arvid Heise  于2021年7月1日周四 下午3:10写道:
> 
>> Looks good: +1 (binding)
>> 
>> On Tue, Jun 29, 2021 at 5:06 AM 刘建刚  wrote:
>> 
>>> +1 (binding)
>>> 
>>> Best
>>> liujiangang
>>> 
>>> Piotr Nowojski  于2021年6月29日周二 上午2:05写道:
>>> 
 +1 (binding)
 
 Piotrek
 
 pon., 28 cze 2021 o 12:48 Dawid Wysakowicz 
 napisał(a):
 
> +1 (binding)
> 
> Best,
> 
> Dawid
> 
> On 28/06/2021 10:45, Yun Gao wrote:
>> Hi all,
>> 
>> For FLIP-147[1] which targets at supports checkpoints after tasks
> finished and modify operator
>> API and implementation to ensures the commit of last piece of data,
> since after the last vote
>> we have more discussions[2][3] and a few updates, including changes
>>> to
> PublicEvolving API,
>> I'd like to have another VOTE on the current state of the FLIP.
>> 
>> The vote will last at least 72 hours (Jul 1st), following the
>>> consensus
>> voting process.
>> 
>> thanks,
>> Yun
>> 
>> 
>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>> [2]
> 
 
>>> 
>> https://lists.apache.org/thread.html/r400da9898ff66fd613c25efea15de440a86f14758ceeae4950ea25cf%40%3Cdev.flink.apache.org
>> [3]
> 
 
>>> 
>> https://lists.apache.org/thread.html/r3953df796ef5ac67d5be9f2251a95ad72efbca31f1d1555d13e71197%40%3Cdev.flink.apache.org%3E
> 
> 
 
>>> 
>> 



Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-07-01 Thread Xintong Song
Thanks all for the positive feedback.

I have updated the wiki page [1], and will send an announcement in a
separate thread, to draw more committers' attention.

Moreover, I've opened FLINK-23212 where we can continue with the discussion
around pure documentation changing PRs.

Thank you~

Xintong Song


[1] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests

On Wed, Jun 30, 2021 at 5:26 PM Xintong Song  wrote:

> I second Tison's opinion.
>
> Similar to how we skip docs_404_check for PRs that do not touch the
> documentation, we can skip other stages for PRs that only contain
> documentation changes.
>
> In addition to making merging documentation PRs easier, we can also reduce
> the workload on CI workers. Especially during the last days of a release
> cycle, which is usually the most busy time for the CI workers, and is also
> where most documentation efforts take place.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jun 30, 2021 at 3:56 PM Till Rohrmann 
> wrote:
>
>> I think you are right Tison that docs are a special case and they only
>> require flink-docs to pass. What I am wondering is how much of a problem
>> this will be (assuming that we have a decent build stability). The more
>> exceptions we add, the harder it will be to properly follow the
>> guidelines.
>> Maybe we can observe how many docs PRs get delayed/not merged because of
>> this and then revisit this discussion if needed.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 30, 2021 at 8:30 AM tison  wrote:
>>
>> > Hi,
>> >
>> > There are a number of PRs modifying docs only, but we still require all
>> > tests passed on that.
>> >
>> > It is a good proposal we avoid merge PR with "unrelated" failure, but
>> can
>> > we improve the case where the contributor only works for docs?
>> >
>> > For example, base on the file change set, run doc tests only.
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > godfrey he  于2021年6月30日周三 下午2:17写道:
>> >
>> > > +1 for the proposal. Thanks Xintong!
>> > >
>> > > Best,
>> > > Godfrey
>> > >
>> > >
>> > >
>> > > Rui Li  于2021年6月30日周三 上午11:36写道:
>> > >
>> > > > Thanks Xintong. +1 to the proposal.
>> > > >
>> > > > On Tue, Jun 29, 2021 at 11:05 AM 刘建刚 
>> > wrote:
>> > > >
>> > > > > +1 for the proposal. Since the test time is long and environment
>> may
>> > > > vary,
>> > > > > unstable tests are really annoying for developers. The solution is
>> > > > welcome.
>> > > > >
>> > > > > Best
>> > > > > liujiangang
>> > > > >
>> > > > > Jingsong Li  于2021年6月29日周二 上午10:31写道:
>> > > > >
>> > > > > > +1 Thanks Xintong for the update!
>> > > > > >
>> > > > > > Best,
>> > > > > > Jingsong
>> > > > > >
>> > > > > > On Mon, Jun 28, 2021 at 6:44 PM Till Rohrmann <
>> > trohrm...@apache.org>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > +1, thanks for updating the guidelines Xintong!
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > > Till
>> > > > > > >
>> > > > > > > On Mon, Jun 28, 2021 at 11:49 AM Yangze Guo <
>> karma...@gmail.com>
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > +1
>> > > > > > > >
>> > > > > > > > Thanks Xintong for drafting this doc.
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Yangze Guo
>> > > > > > > >
>> > > > > > > > On Mon, Jun 28, 2021 at 5:42 PM JING ZHANG <
>> > beyond1...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > Thanks Xintong for giving detailed documentation.
>> > > > > > > > >
>> > > > > > > > > The best practice for handling test failure is very
>> detailed,
>> > > > it's
>> > > > > a
>> > > > > > > good
>> > > > > > > > > guidelines document with clear action steps.
>> > > > > > > > >
>> > > > > > > > > +1 to Xintong's proposal.
>> > > > > > > > >
>> > > > > > > > > Xintong Song  于2021年6月28日周一
>> 下午4:07写道:
>> > > > > > > > >
>> > > > > > > > > > Thanks all for the discussion.
>> > > > > > > > > >
>> > > > > > > > > > Based on the opinions so far, I've drafted the new
>> > guidelines
>> > > > > [1],
>> > > > > > > as a
>> > > > > > > > > > potential replacement of the original wiki page [2].
>> > > > > > > > > >
>> > > > > > > > > > Hopefully this draft has covered the most opinions
>> > discussed
>> > > > and
>> > > > > > > > consensus
>> > > > > > > > > > made in this discussion thread.
>> > > > > > > > > >
>> > > > > > > > > > Looking forward to your feedback.
>> > > > > > > > > >
>> > > > > > > > > > Thank you~
>> > > > > > > > > >
>> > > > > > > > > > Xintong Song
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > [1]
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1uUbxbgbGErBXtmEjhwVhBWG3i6nhQ0LXs96OlntEYnU/edit?usp=sharing
>> > > > > > > > > >
>> > > > > > > > > > [2]
>> > > > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > >
>> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Fri, Jun 2

[jira] [Created] (FLINK-23213) Remove ProcessFunctionOperation

2021-07-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-23213:
---

 Summary: Remove ProcessFunctionOperation
 Key: FLINK-23213
 URL: https://issues.apache.org/jira/browse/FLINK-23213
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


Currently there are three operations to support Python DataStream API: 
ProcessFunctionOperation, DataStreamKeyedStatefulOperation 
DataStreamStatelessFunctionOperation. Actually we could refactor it a bit to 
merge ProcessFunctionOperation and DataStreamStatelessFunctionOperation into 
one. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread Guowei Ma
+1 (binding)
Best,
Guowei


On Fri, Jul 2, 2021 at 11:56 AM Leonard Xu  wrote:

> +1,
>
> This may help https://issues.apache.org/jira/browse/FLINK-22764 <
> https://issues.apache.org/jira/browse/FLINK-22764>
>
> Best,
> Leonard
>
> > 在 2021年7月2日,10:17,JING ZHANG  写道:
> >
> > +1 (binding)
> >
> >
> > Arvid Heise  于2021年7月1日周四 下午3:10写道:
> >
> >> Looks good: +1 (binding)
> >>
> >> On Tue, Jun 29, 2021 at 5:06 AM 刘建刚  wrote:
> >>
> >>> +1 (binding)
> >>>
> >>> Best
> >>> liujiangang
> >>>
> >>> Piotr Nowojski  于2021年6月29日周二 上午2:05写道:
> >>>
>  +1 (binding)
> 
>  Piotrek
> 
>  pon., 28 cze 2021 o 12:48 Dawid Wysakowicz 
>  napisał(a):
> 
> > +1 (binding)
> >
> > Best,
> >
> > Dawid
> >
> > On 28/06/2021 10:45, Yun Gao wrote:
> >> Hi all,
> >>
> >> For FLIP-147[1] which targets at supports checkpoints after tasks
> > finished and modify operator
> >> API and implementation to ensures the commit of last piece of data,
> > since after the last vote
> >> we have more discussions[2][3] and a few updates, including changes
> >>> to
> > PublicEvolving API,
> >> I'd like to have another VOTE on the current state of the FLIP.
> >>
> >> The vote will last at least 72 hours (Jul 1st), following the
> >>> consensus
> >> voting process.
> >>
> >> thanks,
> >> Yun
> >>
> >>
> >> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> >> [2]
> >
> 
> >>>
> >>
> https://lists.apache.org/thread.html/r400da9898ff66fd613c25efea15de440a86f14758ceeae4950ea25cf%40%3Cdev.flink.apache.org
> >> [3]
> >
> 
> >>>
> >>
> https://lists.apache.org/thread.html/r3953df796ef5ac67d5be9f2251a95ad72efbca31f1d1555d13e71197%40%3Cdev.flink.apache.org%3E
> >
> >
> 
> >>>
> >>
>
>


[ANNOUNCE] Criteria for merging pull requests is updated

2021-07-01 Thread Xintong Song
Hi Flink committers,

As previously discussed [1], the criteria for merging pull requests has
been updated.

A full version of guidelines can be found on the project wiki [2]. The
following are some of the highlights.
- MUST make sure passing the CI tests before merging PRs
- SHOULD NOT use the GitHub UI to merge PRs
- For frequent test instabilities that are temporarily disabled, the
corresponding JIRA tickets must be made BLOCKER

I'd like to kindly ask all Flink committers to please read through the new
guidelines and merge PRs accordingly.

Thank you~

Xintong Song


[1]
https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E

[2] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests


Re: [ANNOUNCE] Criteria for merging pull requests is updated

2021-07-01 Thread Chesnay Schepler

- SHOULD NOT use the GitHub UI to merge PRs


Where was this discussed?


On 7/2/2021 6:59 AM, Xintong Song wrote:

Hi Flink committers,

As previously discussed [1], the criteria for merging pull requests has
been updated.

A full version of guidelines can be found on the project wiki [2]. The
following are some of the highlights.
- MUST make sure passing the CI tests before merging PRs
- SHOULD NOT use the GitHub UI to merge PRs
- For frequent test instabilities that are temporarily disabled, the
corresponding JIRA tickets must be made BLOCKER

I'd like to kindly ask all Flink committers to please read through the new
guidelines and merge PRs accordingly.

Thank you~

Xintong Song


[1]
https://lists.apache.org/thread.html/r136028559a23e21edf16ff9eba6c481f68b4154c6454990ee89af6e2%40%3Cdev.flink.apache.org%3E

[2] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests





[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-07-01 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23214:
---

 Summary: Make ShuffleMaster a cluster level shared service
 Key: FLINK-23214
 URL: https://issues.apache.org/jira/browse/FLINK-23214
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao


This ticket tries to make ShuffleMaster a cluster level shared service which 
makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23215) flink-table-code-splitter: NOTICE should in META-INF

2021-07-01 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23215:


 Summary: flink-table-code-splitter: NOTICE should in META-INF
 Key: FLINK-23215
 URL: https://issues.apache.org/jira/browse/FLINK-23215
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Caizhi Weng
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-07-01 Thread Yun Gao
Hi there,

Since the voting time of FLIP-147[1] has passed, I'm closing the vote now.

There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes:

- Dawid Wysakowicz (binding)
- Piotr Nowojski(binding)
- Jiangang Liu (binding)
- Arvid Heise (binding)
- Jing Zhang (binding)
- Leonard Xu (non-binding)
- Guowei Ma (binding)

Thus I'm happy to announce that the update to the FLIP-147 is accepted.

Very thanks everyone!

Best,
Yun

[1]  https://cwiki.apache.org/confluence/x/mw-ZCQ

[jira] [Created] (FLINK-23216) RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout

2021-07-01 Thread Gen Luo (Jira)
Gen Luo created FLINK-23216:
---

 Summary: RM keeps allocating and freeing slots after a TM lost 
until its heartbeat timeout
 Key: FLINK-23216
 URL: https://issues.apache.org/jira/browse/FLINK-23216
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.1
Reporter: Gen Luo


In Flink 1.13, it's observed that the ResourceManager keeps allocating and 
freeing slots with a new TM when it's notified by yarn that a TM is lost. The 
behavior will continue until JM marks the TM as FAILED when its heartbeat 
timeout is reached. It can be easily reproduced by enlarging the 
akka.ask.timeout and heartbeat.timeout, for example to 10 min.

 

After tracking, we find the procedure should be like this:

When a TM is killed, yarn will first receive the event and notify the RM.

In Flink 1.13, RM uses declarative resource management to manage the slots. It 
will find a lack of resources when receiving the notification, and then request 
a new TM from yarn.

RM will then require the new TM to connect and offer slots to JM.

But from JM's point of view, all slots are fulfilled, since the lost TM is not 
considered disconnected yet, until the heartbeat timeout is reached, so JM will 
reject all slot offers.

The new TM will find no slot serving for the JM, then disconnect from the JM.

RM will then find a lack of resources again and go back to step3, requiring the 
new TM to connect and offer slots to JM, but It won't request another new TM 
from yarn.

 

The original log is lost but is like this:

o.a.f.r.r.s.DefaultSlotStatusSyncer - Freeing slot xxx.

...(repeat serval lines for different slots)...

o.a.f.r.r.s.DefaultSlotStatusSyncer - Starting allocation of slot xxx from 
container_xxx for job xxx.

...(repeat serval lines for different slots)...

 

This could be fixed in several ways, such as notifying JM as well the RM 
receives a TM lost notification, TMs do not offer slots until required, etc. 
But all these ways have side effects so may need further discussion. 

Besides, this should no longer be an issue after FLINK-23209 is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)