Hi Hwanju, I looked through the document, however I’m not the best person to review/judge/discuss about implementation details here. I hope that Chesney will be able to help in this regard.
Piotrek > On 24 May 2019, at 09:09, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote: > > Hi, > > As suggested by Piotrek, the first part, execution state tracking, is now > split to a separate doc: > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > We'd appreciate any feedback. I am still using the same email thread to > provide a full context, but please let me know if it's better to have a > separate email thread as well. We will be sharing the remaining parts once > ready. > > Thanks, > Hwanju > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <pi...@ververica.com> wrote: > > Hi Hwanju & Chesney, > > Regarding various things that both of you mentioned, like accounting of > state restoration separately or batch scheduling, we can always acknowledge > some limitations of the initial approach and maybe we can address them later > if we evaluate it worth the effort. > > Generally speaking all that you have written make sense to me, so +1 from > my side to split the discussion into separate threads. > > Piotrek > >> On 17 May 2019, at 08:57, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote: >> >> Hi Piotrek, >> >> Thanks for insightful feedback and indeed you got most tricky parts and >> concerns. >> >>> 1. Do we currently account state restore as “RUNNING”? If yes, this might >>> be incorrect from your perspective. >> >> As Chesnay said, initializeState is called in StreamTask.invoke after >> transitioning to RUNNING. So, task state restore part is currently during >> RUNNING. I think accounting state restore as running seems fine, since state >> size is user's artifact, as long as we can detect service error during >> restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., >> S3 server error) and RocksDB issue happens at initializeState in >> StreamTask.invoke). We can discuss about the need to have separate state to >> track restore and running separately, but it seems to add too many messages >> in common paths just for tracking. >> >>> 2a. This might be more tricky if various Tasks are in various stages. For >>> example in streaming, it should be safe to assume that state of the job, is >>> “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only >>> if all of the Tasks are either RUNNING or COMPLETED. >> >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For >> others, when the first task transitions to SCHEDULED, SCHEDULING stage >> begins, and when the first task transitions to DEPLOYING, it starts >> DEPLOYING stage. This would be fine especially for eager scheduling and >> full-restart fail-over strategy. In the individual or partial restart, we >> may not need to specifically track SCHEDULING and DEPLOYING states while >> treating job as running relying on progress monitor. >> >>> 2b. However in batch - including DataStream jobs running against bounded >>> data streams, like Blink SQL - this might be more tricky, since there are >>> ongoing efforts to schedule part of the job graphs in stages. For example >>> do not schedule probe side of the join until build side is done/completed. >> >> Exactly. I have roughly looked at batch side, but not in detail yet and am >> aware of ongoing scheduling work. Initial focus of breaking out to multiple >> states like scheduling/deploying would be only for streaming with eager >> scheduling. Need to give more thought how to deal with batching/lazy >> scheduling. >> >>> What exactly would you like to report here? List of exception with downtime >>> caused by it, for example: exception X caused a job to be down for 13 >>> minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state >>> restore? >> >> Basically, initial cause is traced back from each component of downtime, >> which is accounted to a certain type like user or system based on the >> classification. So you're right. Interesting part here is about secondary >> failure. For example, a user error causes a job to restart but then >> scheduling is failed by system issue. We need to account failing, restarting >> time to user, while scheduling time on restart (e.g,. 5min timeout) is to >> system. A further example is that a system error causes a job to be failing, >> but one of the user function is not reacting to cancellation (for >> full-restart), prolonged failing time (e.g., watchdog timeout 3min) >> shouldn’t be accounted to system (of course, the other way around has been >> seen -- e.g., FLINK-5463). >> >>> Why do you think about implementing classifiers? Couldn’t we classify >>> exceptions by exception type, like `FlinkUserException`, >>> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that >>> we throw correct exception types + handle/wrap exceptions correctly when >>> crossing Flink system/user code border? This way we could know exactly >>> whether exception occurred in the user code or in Flink code. >> >> I think classifier here is complementary to exception type approach. In this >> context, classifier is "f(exception) -> type". Type is used as metric >> dimension to set alert on certain types or have downtime breakdown on each >> type (type is not just fixed to "user" or "system" but can be more specific >> and customizable like statebackend and network). If we do wrap exceptions >> perfectly as you said, f() is simple enough to look at Exception type and >> then return its corresponding type. >> >> Initially we also thought complete wrapping would be ideal. However, even >> inside UDF, it can call in Flink framework like state update or call out >> dependent services, which service provider may want to classify separately. >> In addition, Flink allows user to use lower level API like streamoperator to >> make the border a little blurring. Those would make complete wrapping >> challenging. Besides, stack-based classification beyond exception type could >> still be needed for stuck progress classification. >> >> Without instrumentation, one of base classifiers that work for our >> environment in many cases is user-class-loader classifier, which can detect >> if an exception is thrown from the class loaded from user JAR/artifact >> (although this may be less desirable in an environment where user's >> artifacts can be installed directly in system lib/, but service providers >> would be opting in self-contained jar submission keeping system environment >> for system-only). >> >>> One thing that might be tricky is if error in Flink code is caused by >>> user’s mistake. >> >> Right, this is the trickiest part. Based on our analysis with real data, the >> most ambiguous ones are custom serialization and out-of-resource errors. >> The former is usually seen in Flink runtime code rather than in UDF. The >> latter is that Flink stack is just a victim by resource hog/leak of user >> code (OOM, too many open files). For the serialization issue, we've been >> looking at (and learning) various serialization errors seen in the field to >> get reasonable classification. For the out-of-resource, rather than user vs. >> system classification, we can tag the type as "resource" relying on dump >> (e.g., heap dump) and postmortem analysis as-needed basis. >> >>> Hmmm, this might be tricky. We can quite easily detect which exact Task is >>> causing back pressure in at least couple of different ways. Tricky part >>> would be to determine whether this is caused by user or not, but probably >>> some simple stack trace probing on back pressured task once every N seconds >>> should solve this - similar how sampling profilers work. >> >> Again you're right and like you said, this part would be mostly reusing the >> existing building blocks such as latency marker and backpressure samplings. >> If configured only with progress monitoring not latency distribution >> tracking, latency marker can be lightweight skipping histogram update part >> just updating latest timestamp with longer period not to adversely affect >> performance. Once stuck progress is detected, stack sampling can tell us >> more about the context that causes backpressure. >> >>> Luckily it seems like those four issues/proposals could be >>> implemented/discussed independently or in stages. >> Agreed. Once some level of initial discussion clears things out at least >> high level, I can start out more independent threads. >> >> Best, >> Hwanju >> >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <pi...@ververica.com> wrote: >> >> Hi Hwanju, >> >> Thanks for starting the discussion. Definitely any improvement in this >> area would be very helpful and valuable. Generally speaking +1 from my side, >> as long as we make sure that either such changes do not add performance >> overhead (which I think they shouldn’t) or they are optional. >> >>> Firstly, we need to account time for each stage of task execution such as >>> scheduling, deploying, and running, to enable better visibility of how long >>> a job takes in which stage while not running user functions. >> >> Couple of questions/remarks: >> 1. Do we currently account state restore as “RUNNING”? If yes, this might >> be incorrect from your perspective. >> 2a. This might be more tricky if various Tasks are in various stages. For >> example in streaming, it should be safe to assume that state of the job, is >> “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only >> if all of the Tasks are either RUNNING or COMPLETED. >> 2b. However in batch - including DataStream jobs running against bounded >> data streams, like Blink SQL - this might be more tricky, since there are >> ongoing efforts to schedule part of the job graphs in stages. For example do >> not schedule probe side of the join until build side is done/completed. >> >>> Secondly, any downtime in each stage can be associated with a failure >>> cause, which could be identified by Java exception notified to job manager >>> on task failure or unhealthy task manager (Flink already maintains a cause >>> but it can be associated with an execution stage for causal tracking) >> >> What exactly would you like to report here? List of exception with >> downtime caused by it, for example: exception X caused a job to be down for >> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state >> restore? >> >>> Thirdly, downtime reason should be classified into user- or system-induced >>> failure. This needs exception classifier by drawing the line between >>> user-defined functions (or public API) and Flink runtime — This is >>> particularly challenging to have 100% accuracy at one-shot due to empirical >>> nature and custom logic injection like serialization, so pluggable >>> classifier filters are must-have to enable incremental improvement. >> >> Why do you think about implementing classifiers? Couldn’t we classify >> exceptions by exception type, like `FlinkUserException`, >> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that >> we throw correct exception types + handle/wrap exceptions correctly when >> crossing Flink system/user code border? This way we could know exactly >> whether exception occurred in the user code or in Flink code. >> >> One thing that might be tricky is if error in Flink code is caused by >> user’s mistake. >> >> >>> Fourthly, stuck progress >> >> Hmmm, this might be tricky. We can quite easily detect which exact Task is >> causing back pressure in at least couple of different ways. Tricky part >> would be to determine whether this is caused by user or not, but probably >> some simple stack trace probing on back pressured task once every N seconds >> should solve this - similar how sampling profilers work. >> >> Luckily it seems like those four issues/proposals could be >> implemented/discussed independently or in stages. >> >> Piotrek >> >>> On 11 May 2019, at 06:50, Kim, Hwanju <hwanj...@amazon.com.INVALID> wrote: >>> >>> Hi, >>> >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion >>> thread about a project we consider for Flink operational improvement in >>> production. We would like to start conversation early before detailed >>> design, so any high-level feedback would welcome. >>> >>> For service providers who operate Flink in a multi-tenant environment, such >>> as AWS Kinesis Data Analytics, it is crucial to measure application health >>> and clearly differentiate application unavailability issue caused by Flink >>> framework or service environment from the ones caused by application code. >>> The current metrics of Flink represent overall job availability in time, it >>> still needs to be improved to give Flink operators better insight for the >>> detailed application availability. The current availability metrics such as >>> uptime and downtime measures the time based on the running state of a job, >>> which does not necessarily represent actual running state of a job (after a >>> job transitions to running, each task should still be scheduled/deployed in >>> order to run user-defined functions). The detailed view should enable >>> operators to have visibility on 1) how long each specific stage takes >>> (e.g., task scheduling or deployment), 2) what failure is introduced in >>> which stage leading to job downtime, 3) whether such failure is classified >>> to user code error (e.g., uncaught exception from user-defined function) or >>> platform/environmental errors (e.g., checkpointing issue, unhealthy nodes >>> hosting job/task managers, Flink bug). The last one is particularly needed >>> to allow Flink operators to define SLA where only a small fraction of >>> downtime should be introduced by service fault. All of these visibility >>> enhancements can help community detect and fix Flink runtime issues >>> quickly, whereby Flink can become more robust operating system for hosting >>> data analytics applications. >>> >>> The current proposal is as follows. Firstly, we need to account time for >>> each stage of task execution such as scheduling, deploying, and running, to >>> enable better visibility of how long a job takes in which stage while not >>> running user functions. Secondly, any downtime in each stage can be >>> associated with a failure cause, which could be identified by Java >>> exception notified to job manager on task failure or unhealthy task manager >>> (Flink already maintains a cause but it can be associated with an execution >>> stage for causal tracking). Thirdly, downtime reason should be classified >>> into user- or system-induced failure. This needs exception classifier by >>> drawing the line between user-defined functions (or public API) and Flink >>> runtime — This is particularly challenging to have 100% accuracy at >>> one-shot due to empirical nature and custom logic injection like >>> serialization, so pluggable classifier filters are must-have to enable >>> incremental improvement. Fourthly, stuck progress, where task is apparently >>> running but not being able to process data generally manifesting itself as >>> long backpressure, can be monitored as higher level job availability and >>> the runtime can determine whether the reason to be stuck is caused by user >>> (e.g., under-provisioned resource, user function bug) or system (deadlock >>> or livelock in Flink runtime). Finally, all the detailed tracking >>> information and metrics are exposed via REST and Flink metrics, so that >>> Flink dashboard can have enhanced information about job >>> execution/availability and operators can set alarm appropriately on metrics. >>> >>> Best, >>> Hwanju >>> >> >> >> > > >