Hi Hwanju,

Thanks for starting the discussion. Definitely any improvement in this area 
would be very helpful and valuable. Generally speaking +1 from my side, as long 
as we make sure that either such changes do not add performance overhead (which 
I think they shouldn’t) or they are optional. 

> Firstly, we need to account time for each stage of task execution such as 
> scheduling, deploying, and running, to enable better visibility of how long a 
> job takes in which stage while not running user functions.

Couple of questions/remarks:
1. Do we currently account state restore as “RUNNING”? If yes, this might be 
incorrect from your perspective.
2a. This might be more tricky if various Tasks are in various stages. For 
example in streaming, it should be safe to assume that state of the job, is 
“minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if 
all of the Tasks are either RUNNING or COMPLETED. 
2b. However in batch - including DataStream jobs running against bounded data 
streams, like Blink SQL - this might be more tricky, since there are ongoing 
efforts to schedule part of the job graphs in stages. For example do not 
schedule probe side of the join until build side is done/completed.

>  Secondly, any downtime in each stage can be associated with a failure cause, 
> which could be identified by Java exception notified to job manager on task 
> failure or unhealthy task manager (Flink already maintains a cause but it can 
> be associated with an execution stage for causal tracking)

What exactly would you like to report here? List of exception with downtime 
caused by it, for example: exception X caused a job to be down for 13 minutes, 
1 minute in scheduling, 1 minute deploying, 11 minutes state restore?

>  Thirdly, downtime reason should be classified into user- or system-induced 
> failure. This needs exception classifier by drawing the line between 
> user-defined functions (or public API) and Flink runtime — This is 
> particularly challenging to have 100% accuracy at one-shot due to empirical 
> nature and custom logic injection like serialization, so pluggable classifier 
> filters are must-have to enable incremental improvement. 

Why do you think about implementing classifiers? Couldn’t we classify 
exceptions by exception type, like `FlinkUserException`, 
`FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we 
throw correct exception types + handle/wrap exceptions correctly when crossing 
Flink system/user code border? This way we could know exactly whether exception 
occurred in the user code or in Flink code. 

One thing that might be tricky is if error in Flink code is caused by user’s 
mistake.


>  Fourthly, stuck progress

Hmmm, this might be tricky. We can quite easily detect which exact Task is 
causing back pressure in at least couple of different ways. Tricky part would 
be to determine whether this is caused by user or not, but probably some simple 
stack trace probing on back pressured task once every N seconds should solve 
this - similar how sampling profilers work.

Luckily it seems like those four issues/proposals could be 
implemented/discussed independently or in stages.

Piotrek

> On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote:
> 
> Hi,
> 
> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion 
> thread about a project we consider for Flink operational improvement in 
> production. We would like to start conversation early before detailed design, 
> so any high-level feedback would welcome.
> 
> For service providers who operate Flink in a multi-tenant environment, such 
> as AWS Kinesis Data Analytics, it is crucial to measure application health 
> and clearly differentiate application unavailability issue caused by Flink 
> framework or service environment from the ones caused by application code. 
> The current metrics of Flink represent overall job availability in time, it 
> still needs to be improved to give Flink operators better insight for the 
> detailed application availability. The current availability metrics such as 
> uptime and downtime measures the time based on the running state of a job, 
> which does not necessarily represent actual running state of a job (after a 
> job transitions to running, each task should still be scheduled/deployed in 
> order to run user-defined functions). The detailed view should enable 
> operators to have visibility on 1) how long each specific stage takes (e.g., 
> task scheduling or deployment), 2) what failure is introduced in which stage 
> leading to job downtime, 3) whether such failure is classified to user code 
> error (e.g., uncaught exception from user-defined function) or 
> platform/environmental errors (e.g., checkpointing issue, unhealthy nodes 
> hosting job/task managers, Flink bug). The last one is particularly needed to 
> allow Flink operators to define SLA where only a small fraction of downtime 
> should be introduced by service fault. All of these visibility enhancements 
> can help community detect and fix Flink runtime issues quickly, whereby Flink 
> can become more robust operating system for hosting data analytics 
> applications.
> 
> The current proposal is as follows. Firstly, we need to account time for each 
> stage of task execution such as scheduling, deploying, and running, to enable 
> better visibility of how long a job takes in which stage while not running 
> user functions. Secondly, any downtime in each stage can be associated with a 
> failure cause, which could be identified by Java exception notified to job 
> manager on task failure or unhealthy task manager (Flink already maintains a 
> cause but it can be associated with an execution stage for causal tracking). 
> Thirdly, downtime reason should be classified into user- or system-induced 
> failure. This needs exception classifier by drawing the line between 
> user-defined functions (or public API) and Flink runtime — This is 
> particularly challenging to have 100% accuracy at one-shot due to empirical 
> nature and custom logic injection like serialization, so pluggable classifier 
> filters are must-have to enable incremental improvement. Fourthly, stuck 
> progress, where task is apparently running but not being able to process data 
> generally manifesting itself as long backpressure, can be monitored as higher 
> level job availability and the runtime can determine whether the reason to be 
> stuck is caused by user (e.g., under-provisioned resource, user function bug) 
> or system (deadlock or livelock in Flink runtime). Finally, all the detailed 
> tracking information and metrics are exposed via REST and Flink metrics, so 
> that Flink dashboard can have enhanced information about job 
> execution/availability and operators can set alarm appropriately on metrics.
> 
> Best,
> Hwanju
> 

Reply via email to