Hi Hwanju,

I looked through the document, however I’m not the best person to 
review/judge/discuss about implementation details here. I hope that Chesney 
will be able to help in this regard.

Piotrek

> On 24 May 2019, at 09:09, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote:
> 
> Hi,
> 
> As suggested by Piotrek, the first part, execution state tracking, is now 
> split to a separate doc: 
> https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing
> 
> We'd appreciate any feedback. I am still using the same email thread to 
> provide a full context, but please let me know if it's better to have a 
> separate email thread as well. We will be sharing the remaining parts once 
> ready.
> 
> Thanks,
> Hwanju
> 
> On 5/17/19, 12:59 AM, "Piotr Nowojski" <pi...@ververica.com> wrote:
> 
>    Hi Hwanju & Chesney,
> 
>    Regarding various things that both of you mentioned, like accounting of 
> state restoration separately or batch scheduling, we can always acknowledge 
> some limitations of the initial approach and maybe we can address them later 
> if we evaluate it worth the effort.
> 
>    Generally speaking all that you have written make sense to me, so +1 from 
> my side to split the discussion into separate threads.
> 
>    Piotrek
> 
>> On 17 May 2019, at 08:57, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote:
>> 
>> Hi Piotrek,
>> 
>> Thanks for insightful feedback and indeed you got most tricky parts and 
>> concerns.
>> 
>>> 1. Do we currently account state restore as “RUNNING”? If yes, this might 
>>> be incorrect from your perspective.
>> 
>> As Chesnay said, initializeState is called in StreamTask.invoke after 
>> transitioning to RUNNING. So, task state restore part is currently during 
>> RUNNING. I think accounting state restore as running seems fine, since state 
>> size is user's artifact, as long as we can detect service error during 
>> restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., 
>> S3 server error) and RocksDB issue happens at initializeState in 
>> StreamTask.invoke). We can discuss about the need to have separate state to 
>> track restore and running separately, but it seems to add too many messages 
>> in common paths just for tracking.
>> 
>>> 2a. This might be more tricky if various Tasks are in various stages. For 
>>> example in streaming, it should be safe to assume that state of the job, is 
>>> “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only 
>>> if all of the Tasks are either RUNNING or COMPLETED.
>> 
>> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For 
>> others, when the first task transitions to SCHEDULED, SCHEDULING stage 
>> begins, and when the first task transitions to DEPLOYING, it starts 
>> DEPLOYING stage. This would be fine especially for eager scheduling and 
>> full-restart fail-over strategy. In the individual or partial restart, we 
>> may not need to specifically track SCHEDULING and DEPLOYING states while 
>> treating job as running relying on progress monitor.
>> 
>>> 2b. However in batch - including DataStream jobs running against bounded 
>>> data streams, like Blink SQL - this might be more tricky, since there are 
>>> ongoing efforts to schedule part of the job graphs in stages. For example 
>>> do not schedule probe side of the join until build side is done/completed.
>> 
>> Exactly. I have roughly looked at batch side, but not in detail yet and am 
>> aware of ongoing scheduling work. Initial focus of breaking out to multiple 
>> states like scheduling/deploying would be only for streaming with eager 
>> scheduling. Need to give more thought how to deal with batching/lazy 
>> scheduling.
>> 
>>> What exactly would you like to report here? List of exception with downtime 
>>> caused by it, for example: exception X caused a job to be down for 13 
>>> minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state 
>>> restore?
>> 
>> Basically, initial cause is traced back from each component of downtime, 
>> which is accounted to a certain type like user or system based on the 
>> classification. So you're right. Interesting part here is about secondary 
>> failure. For example, a user error causes a job to restart but then 
>> scheduling is failed by system issue. We need to account failing, restarting 
>> time to user, while scheduling time on restart (e.g,. 5min timeout) is to 
>> system. A further example is that a system error causes a job to be failing, 
>> but one of the user function is not reacting to cancellation (for 
>> full-restart), prolonged failing time (e.g., watchdog timeout 3min) 
>> shouldn’t be accounted to system (of course, the other way around has been 
>> seen -- e.g., FLINK-5463).
>> 
>>> Why do you think about implementing classifiers? Couldn’t we classify 
>>> exceptions by exception type, like `FlinkUserException`, 
>>> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that 
>>> we throw correct exception types + handle/wrap exceptions correctly when 
>>> crossing Flink system/user code border? This way we could know exactly 
>>> whether exception occurred in the user code or in Flink code. 
>> 
>> I think classifier here is complementary to exception type approach. In this 
>> context, classifier is "f(exception) -> type". Type is used as metric 
>> dimension to set alert on certain types or have downtime breakdown on each 
>> type (type is not just fixed to "user" or "system" but can be more specific 
>> and customizable like statebackend and network). If we do wrap exceptions 
>> perfectly as you said, f() is simple enough to look at Exception type and 
>> then return its corresponding type.
>> 
>> Initially we also thought complete wrapping would be ideal. However, even 
>> inside UDF, it can call in Flink framework like state update or call out 
>> dependent services, which service provider may want to classify separately. 
>> In addition, Flink allows user to use lower level API like streamoperator to 
>> make the border a little blurring. Those would make complete wrapping 
>> challenging. Besides, stack-based classification beyond exception type could 
>> still be needed for stuck progress classification.
>> 
>> Without instrumentation, one of base classifiers that work for our 
>> environment in many cases is user-class-loader classifier, which can detect 
>> if an exception is thrown from the class loaded from user JAR/artifact 
>> (although this may be less desirable in an environment where user's 
>> artifacts can be installed directly in system lib/, but service providers 
>> would be opting in self-contained jar submission keeping system environment 
>> for system-only).
>> 
>>>  One thing that might be tricky is if error in Flink code is caused by 
>>> user’s mistake.
>> 
>> Right, this is the trickiest part. Based on our analysis with real data, the 
>> most ambiguous ones are custom serialization and out-of-resource errors.  
>> The former is usually seen in Flink runtime code rather than in UDF. The 
>> latter is that Flink stack is just a victim by resource hog/leak of user 
>> code (OOM, too many open files). For the serialization issue, we've been 
>> looking at (and learning) various serialization errors seen in the field to 
>> get reasonable classification. For the out-of-resource, rather than user vs. 
>> system classification, we can tag the type as "resource" relying on dump 
>> (e.g., heap dump) and postmortem analysis as-needed basis.
>> 
>>> Hmmm, this might be tricky. We can quite easily detect which exact Task is 
>>> causing back pressure in at least couple of different ways. Tricky part 
>>> would be to determine whether this is caused by user or not, but probably 
>>> some simple stack trace probing on back pressured task once every N seconds 
>>> should solve this - similar how sampling profilers work.
>> 
>> Again you're right and like you said, this part would be mostly reusing the 
>> existing building blocks such as latency marker and backpressure samplings. 
>> If configured only with progress monitoring not latency distribution 
>> tracking, latency marker can be lightweight skipping histogram update part 
>> just updating latest timestamp with longer period not to adversely affect 
>> performance. Once stuck progress is detected, stack sampling can tell us 
>> more about the context that causes backpressure. 
>> 
>>> Luckily it seems like those four issues/proposals could be 
>>> implemented/discussed independently or in stages.
>> Agreed. Once some level of initial discussion clears things out at least 
>> high level, I can start out more independent threads.
>> 
>> Best,
>> Hwanju
>> 
>> On 5/16/19, 2:44 AM, "Piotr Nowojski" <pi...@ververica.com> wrote:
>> 
>>   Hi Hwanju,
>> 
>>   Thanks for starting the discussion. Definitely any improvement in this 
>> area would be very helpful and valuable. Generally speaking +1 from my side, 
>> as long as we make sure that either such changes do not add performance 
>> overhead (which I think they shouldn’t) or they are optional. 
>> 
>>> Firstly, we need to account time for each stage of task execution such as 
>>> scheduling, deploying, and running, to enable better visibility of how long 
>>> a job takes in which stage while not running user functions.
>> 
>>   Couple of questions/remarks:
>>   1. Do we currently account state restore as “RUNNING”? If yes, this might 
>> be incorrect from your perspective.
>>   2a. This might be more tricky if various Tasks are in various stages. For 
>> example in streaming, it should be safe to assume that state of the job, is 
>> “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only 
>> if all of the Tasks are either RUNNING or COMPLETED. 
>>   2b. However in batch - including DataStream jobs running against bounded 
>> data streams, like Blink SQL - this might be more tricky, since there are 
>> ongoing efforts to schedule part of the job graphs in stages. For example do 
>> not schedule probe side of the join until build side is done/completed.
>> 
>>> Secondly, any downtime in each stage can be associated with a failure 
>>> cause, which could be identified by Java exception notified to job manager 
>>> on task failure or unhealthy task manager (Flink already maintains a cause 
>>> but it can be associated with an execution stage for causal tracking)
>> 
>>   What exactly would you like to report here? List of exception with 
>> downtime caused by it, for example: exception X caused a job to be down for 
>> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state 
>> restore?
>> 
>>> Thirdly, downtime reason should be classified into user- or system-induced 
>>> failure. This needs exception classifier by drawing the line between 
>>> user-defined functions (or public API) and Flink runtime — This is 
>>> particularly challenging to have 100% accuracy at one-shot due to empirical 
>>> nature and custom logic injection like serialization, so pluggable 
>>> classifier filters are must-have to enable incremental improvement. 
>> 
>>   Why do you think about implementing classifiers? Couldn’t we classify 
>> exceptions by exception type, like `FlinkUserException`, 
>> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that 
>> we throw correct exception types + handle/wrap exceptions correctly when 
>> crossing Flink system/user code border? This way we could know exactly 
>> whether exception occurred in the user code or in Flink code. 
>> 
>>   One thing that might be tricky is if error in Flink code is caused by 
>> user’s mistake.
>> 
>> 
>>> Fourthly, stuck progress
>> 
>>   Hmmm, this might be tricky. We can quite easily detect which exact Task is 
>> causing back pressure in at least couple of different ways. Tricky part 
>> would be to determine whether this is caused by user or not, but probably 
>> some simple stack trace probing on back pressured task once every N seconds 
>> should solve this - similar how sampling profilers work.
>> 
>>   Luckily it seems like those four issues/proposals could be 
>> implemented/discussed independently or in stages.
>> 
>>   Piotrek
>> 
>>> On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote:
>>> 
>>> Hi,
>>> 
>>> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion 
>>> thread about a project we consider for Flink operational improvement in 
>>> production. We would like to start conversation early before detailed 
>>> design, so any high-level feedback would welcome.
>>> 
>>> For service providers who operate Flink in a multi-tenant environment, such 
>>> as AWS Kinesis Data Analytics, it is crucial to measure application health 
>>> and clearly differentiate application unavailability issue caused by Flink 
>>> framework or service environment from the ones caused by application code. 
>>> The current metrics of Flink represent overall job availability in time, it 
>>> still needs to be improved to give Flink operators better insight for the 
>>> detailed application availability. The current availability metrics such as 
>>> uptime and downtime measures the time based on the running state of a job, 
>>> which does not necessarily represent actual running state of a job (after a 
>>> job transitions to running, each task should still be scheduled/deployed in 
>>> order to run user-defined functions). The detailed view should enable 
>>> operators to have visibility on 1) how long each specific stage takes 
>>> (e.g., task scheduling or deployment), 2) what failure is introduced in 
>>> which stage leading to job downtime, 3) whether such failure is classified 
>>> to user code error (e.g., uncaught exception from user-defined function) or 
>>> platform/environmental errors (e.g., checkpointing issue, unhealthy nodes 
>>> hosting job/task managers, Flink bug). The last one is particularly needed 
>>> to allow Flink operators to define SLA where only a small fraction of 
>>> downtime should be introduced by service fault. All of these visibility 
>>> enhancements can help community detect and fix Flink runtime issues 
>>> quickly, whereby Flink can become more robust operating system for hosting 
>>> data analytics applications.
>>> 
>>> The current proposal is as follows. Firstly, we need to account time for 
>>> each stage of task execution such as scheduling, deploying, and running, to 
>>> enable better visibility of how long a job takes in which stage while not 
>>> running user functions. Secondly, any downtime in each stage can be 
>>> associated with a failure cause, which could be identified by Java 
>>> exception notified to job manager on task failure or unhealthy task manager 
>>> (Flink already maintains a cause but it can be associated with an execution 
>>> stage for causal tracking). Thirdly, downtime reason should be classified 
>>> into user- or system-induced failure. This needs exception classifier by 
>>> drawing the line between user-defined functions (or public API) and Flink 
>>> runtime — This is particularly challenging to have 100% accuracy at 
>>> one-shot due to empirical nature and custom logic injection like 
>>> serialization, so pluggable classifier filters are must-have to enable 
>>> incremental improvement. Fourthly, stuck progress, where task is apparently 
>>> running but not being able to process data generally manifesting itself as 
>>> long backpressure, can be monitored as higher level job availability and 
>>> the runtime can determine whether the reason to be stuck is caused by user 
>>> (e.g., under-provisioned resource, user function bug) or system (deadlock 
>>> or livelock in Flink runtime). Finally, all the detailed tracking 
>>> information and metrics are exposed via REST and Flink metrics, so that 
>>> Flink dashboard can have enhanced information about job 
>>> execution/availability and operators can set alarm appropriately on metrics.
>>> 
>>> Best,
>>> Hwanju
>>> 
>> 
>> 
>> 
> 
> 
> 

Reply via email to