Hi Piotrek,

Thanks for insightful feedback and indeed you got most tricky parts and 
concerns.

> 1. Do we currently account state restore as “RUNNING”? If yes, this might be 
> incorrect from your perspective.

As Chesnay said, initializeState is called in StreamTask.invoke after 
transitioning to RUNNING. So, task state restore part is currently during 
RUNNING. I think accounting state restore as running seems fine, since state 
size is user's artifact, as long as we can detect service error during restore 
(indeed, DFS issue usually happens at createCheckpointStorage (e.g., S3 server 
error) and RocksDB issue happens at initializeState in StreamTask.invoke). We 
can discuss about the need to have separate state to track restore and running 
separately, but it seems to add too many messages in common paths just for 
tracking.

> 2a. This might be more tricky if various Tasks are in various stages. For 
> example in streaming, it should be safe to assume that state of the job, is 
> “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only 
> if all of the Tasks are either RUNNING or COMPLETED.

Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For 
others, when the first task transitions to SCHEDULED, SCHEDULING stage begins, 
and when the first task transitions to DEPLOYING, it starts DEPLOYING stage. 
This would be fine especially for eager scheduling and full-restart fail-over 
strategy. In the individual or partial restart, we may not need to specifically 
track SCHEDULING and DEPLOYING states while treating job as running relying on 
progress monitor.

>2b. However in batch - including DataStream jobs running against bounded data 
>streams, like Blink SQL - this might be more tricky, since there are ongoing 
>efforts to schedule part of the job graphs in stages. For example do not 
>schedule probe side of the join until build side is done/completed.

Exactly. I have roughly looked at batch side, but not in detail yet and am 
aware of ongoing scheduling work. Initial focus of breaking out to multiple 
states like scheduling/deploying would be only for streaming with eager 
scheduling. Need to give more thought how to deal with batching/lazy scheduling.

> What exactly would you like to report here? List of exception with downtime 
> caused by it, for example: exception X caused a job to be down for 13 
> minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore?

Basically, initial cause is traced back from each component of downtime, which 
is accounted to a certain type like user or system based on the classification. 
So you're right. Interesting part here is about secondary failure. For example, 
a user error causes a job to restart but then scheduling is failed by system 
issue. We need to account failing, restarting time to user, while scheduling 
time on restart (e.g,. 5min timeout) is to system. A further example is that a 
system error causes a job to be failing, but one of the user function is not 
reacting to cancellation (for full-restart), prolonged failing time (e.g., 
watchdog timeout 3min) shouldn’t be accounted to system (of course, the other 
way around has been seen -- e.g., FLINK-5463).

> Why do you think about implementing classifiers? Couldn’t we classify 
> exceptions by exception type, like `FlinkUserException`, 
> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we 
> throw correct exception types + handle/wrap exceptions correctly when 
> crossing Flink system/user code border? This way we could know exactly 
> whether exception occurred in the user code or in Flink code. 

I think classifier here is complementary to exception type approach. In this 
context, classifier is "f(exception) -> type". Type is used as metric dimension 
to set alert on certain types or have downtime breakdown on each type (type is 
not just fixed to "user" or "system" but can be more specific and customizable 
like statebackend and network). If we do wrap exceptions perfectly as you said, 
f() is simple enough to look at Exception type and then return its 
corresponding type.

Initially we also thought complete wrapping would be ideal. However, even 
inside UDF, it can call in Flink framework like state update or call out 
dependent services, which service provider may want to classify separately. In 
addition, Flink allows user to use lower level API like streamoperator to make 
the border a little blurring. Those would make complete wrapping challenging. 
Besides, stack-based classification beyond exception type could still be needed 
for stuck progress classification.

Without instrumentation, one of base classifiers that work for our environment 
in many cases is user-class-loader classifier, which can detect if an exception 
is thrown from the class loaded from user JAR/artifact (although this may be 
less desirable in an environment where user's artifacts can be installed 
directly in system lib/, but service providers would be opting in 
self-contained jar submission keeping system environment for system-only).
  
>    One thing that might be tricky is if error in Flink code is caused by 
> user’s mistake.

Right, this is the trickiest part. Based on our analysis with real data, the 
most ambiguous ones are custom serialization and out-of-resource errors.  The 
former is usually seen in Flink runtime code rather than in UDF. The latter is 
that Flink stack is just a victim by resource hog/leak of user code (OOM, too 
many open files). For the serialization issue, we've been looking at (and 
learning) various serialization errors seen in the field to get reasonable 
classification. For the out-of-resource, rather than user vs. system 
classification, we can tag the type as "resource" relying on dump (e.g., heap 
dump) and postmortem analysis as-needed basis.

> Hmmm, this might be tricky. We can quite easily detect which exact Task is 
> causing back pressure in at least couple of different ways. Tricky part would 
> be to determine whether this is caused by user or not, but probably some 
> simple stack trace probing on back pressured task once every N seconds should 
> solve this - similar how sampling profilers work.

Again you're right and like you said, this part would be mostly reusing the 
existing building blocks such as latency marker and backpressure samplings. If 
configured only with progress monitoring not latency distribution tracking, 
latency marker can be lightweight skipping histogram update part just updating 
latest timestamp with longer period not to adversely affect performance. Once 
stuck progress is detected, stack sampling can tell us more about the context 
that causes backpressure. 

> Luckily it seems like those four issues/proposals could be 
> implemented/discussed independently or in stages.
Agreed. Once some level of initial discussion clears things out at least high 
level, I can start out more independent threads.

Best,
Hwanju

On 5/16/19, 2:44 AM, "Piotr Nowojski" <pi...@ververica.com> wrote:

    Hi Hwanju,
    
    Thanks for starting the discussion. Definitely any improvement in this area 
would be very helpful and valuable. Generally speaking +1 from my side, as long 
as we make sure that either such changes do not add performance overhead (which 
I think they shouldn’t) or they are optional. 
    
    > Firstly, we need to account time for each stage of task execution such as 
scheduling, deploying, and running, to enable better visibility of how long a 
job takes in which stage while not running user functions.
    
    Couple of questions/remarks:
    1. Do we currently account state restore as “RUNNING”? If yes, this might 
be incorrect from your perspective.
    2a. This might be more tricky if various Tasks are in various stages. For 
example in streaming, it should be safe to assume that state of the job, is 
“minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if 
all of the Tasks are either RUNNING or COMPLETED. 
    2b. However in batch - including DataStream jobs running against bounded 
data streams, like Blink SQL - this might be more tricky, since there are 
ongoing efforts to schedule part of the job graphs in stages. For example do 
not schedule probe side of the join until build side is done/completed.
    
    >  Secondly, any downtime in each stage can be associated with a failure 
cause, which could be identified by Java exception notified to job manager on 
task failure or unhealthy task manager (Flink already maintains a cause but it 
can be associated with an execution stage for causal tracking)
    
    What exactly would you like to report here? List of exception with downtime 
caused by it, for example: exception X caused a job to be down for 13 minutes, 
1 minute in scheduling, 1 minute deploying, 11 minutes state restore?
    
    >  Thirdly, downtime reason should be classified into user- or 
system-induced failure. This needs exception classifier by drawing the line 
between user-defined functions (or public API) and Flink runtime — This is 
particularly challenging to have 100% accuracy at one-shot due to empirical 
nature and custom logic injection like serialization, so pluggable classifier 
filters are must-have to enable incremental improvement. 
    
    Why do you think about implementing classifiers? Couldn’t we classify 
exceptions by exception type, like `FlinkUserException`, 
`FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we 
throw correct exception types + handle/wrap exceptions correctly when crossing 
Flink system/user code border? This way we could know exactly whether exception 
occurred in the user code or in Flink code. 
    
    One thing that might be tricky is if error in Flink code is caused by 
user’s mistake.
    
    
    >  Fourthly, stuck progress
    
    Hmmm, this might be tricky. We can quite easily detect which exact Task is 
causing back pressure in at least couple of different ways. Tricky part would 
be to determine whether this is caused by user or not, but probably some simple 
stack trace probing on back pressured task once every N seconds should solve 
this - similar how sampling profilers work.
    
    Luckily it seems like those four issues/proposals could be 
implemented/discussed independently or in stages.
    
    Piotrek
    
    > On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote:
    > 
    > Hi,
    > 
    > I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion 
thread about a project we consider for Flink operational improvement in 
production. We would like to start conversation early before detailed design, 
so any high-level feedback would welcome.
    > 
    > For service providers who operate Flink in a multi-tenant environment, 
such as AWS Kinesis Data Analytics, it is crucial to measure application health 
and clearly differentiate application unavailability issue caused by Flink 
framework or service environment from the ones caused by application code. The 
current metrics of Flink represent overall job availability in time, it still 
needs to be improved to give Flink operators better insight for the detailed 
application availability. The current availability metrics such as uptime and 
downtime measures the time based on the running state of a job, which does not 
necessarily represent actual running state of a job (after a job transitions to 
running, each task should still be scheduled/deployed in order to run 
user-defined functions). The detailed view should enable operators to have 
visibility on 1) how long each specific stage takes (e.g., task scheduling or 
deployment), 2) what failure is introduced in which stage leading to job 
downtime, 3) whether such failure is classified to user code error (e.g., 
uncaught exception from user-defined function) or platform/environmental errors 
(e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink 
bug). The last one is particularly needed to allow Flink operators to define 
SLA where only a small fraction of downtime should be introduced by service 
fault. All of these visibility enhancements can help community detect and fix 
Flink runtime issues quickly, whereby Flink can become more robust operating 
system for hosting data analytics applications.
    > 
    > The current proposal is as follows. Firstly, we need to account time for 
each stage of task execution such as scheduling, deploying, and running, to 
enable better visibility of how long a job takes in which stage while not 
running user functions. Secondly, any downtime in each stage can be associated 
with a failure cause, which could be identified by Java exception notified to 
job manager on task failure or unhealthy task manager (Flink already maintains 
a cause but it can be associated with an execution stage for causal tracking). 
Thirdly, downtime reason should be classified into user- or system-induced 
failure. This needs exception classifier by drawing the line between 
user-defined functions (or public API) and Flink runtime — This is particularly 
challenging to have 100% accuracy at one-shot due to empirical nature and 
custom logic injection like serialization, so pluggable classifier filters are 
must-have to enable incremental improvement. Fourthly, stuck progress, where 
task is apparently running but not being able to process data generally 
manifesting itself as long backpressure, can be monitored as higher level job 
availability and the runtime can determine whether the reason to be stuck is 
caused by user (e.g., under-provisioned resource, user function bug) or system 
(deadlock or livelock in Flink runtime). Finally, all the detailed tracking 
information and metrics are exposed via REST and Flink metrics, so that Flink 
dashboard can have enhanced information about job execution/availability and 
operators can set alarm appropriately on metrics.
    > 
    > Best,
    > Hwanju
    > 
    
    

Reply via email to