[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334427#comment-17334427 ]
wangwj commented on FLINK-10644: -------------------------------- [~trohrmann] Hi Till. I am come from the search and recommendation department of Alibaba in China. Our big data processing platform uses Flink Batch to process extremely huge data every day. There are many long-tail tasks every day, we can only manually go to the machine to kill the process and seriously affect the experience of the our users. So I made up my mind to solve this problem. First of all, I think that speculative execution means that two executions in a ExecutionVertex run at the same time. While failover means that two tasks run at different times. Based on this theory, I think this feature(speculative execution) is definitely achievable. Finally, the facts proved that my idea was right So I have implemented a speculative execution for batch job based on Blink, and it has a very significant effect in our product cluster. My approach is as follows, happy to discuss them. (1)Which kind of ExecutionJobVertex is suitable enable speculative execution feature in a batch job? Because of the speculative execution feature involves the implementation details of the region failover. After research, I have decided a ExecutionJobVertex will enable speculative execution feature only if all input edges and output edges of this ExecutionJobVertex are blocking. (2)How to distinguish long tail task? I distinguish long tail task based on current time and the execution first create/deploying time before it failover. For ExecutionJobVertex that meets the condition (1) When a configurable percentage of executions have been finished in an ExecutionJobVertex, the speculative execution thread starts to really work. In an ExecutionJobVertex, when the running time of a execution is a configurable multiple of the median running time of other finished executions, this execution is judged as long tail execution. (3)How to make the speculative execution algorithm more precise? Baesd on the speculative execution algorithm in (2), In our product cluster, I can completely solve the long tail problem. In the next step, we will add the throughput of the task to the speculative execution algorithm through the heartbeat of TaskManager with JobManager. (4)How schedule another Execution in an same ExecutionVertex? We have changed the currentExecution in ExecutionVertex to a list, which means that there can be multiple executions in an ExecutionVertex at the same time. Then we reuse the current scheduling logic to schedule the speculative execution execution. (5)How to make the speculative task runs on a different machine from the original task. We have implemented a machine-dimensional blacklist,and add the machine ip in the blacklist when a execution is a long tail execution base on speculative execution algorithm in (2). The blacklist has the ability of timed out. (6)How to avoid errors when multiple executions finish at the same time in an ExecutionVertex? In ExecutionVertex executionFinished() method, I have done multi-thread synchronization, to ensure that an ExecutionVertex will eventually have only one execution successfully finished, and other executions will all go to the cancellation logic. (7)How to deal with multiple sink files in one ExecutionVertex when the job is sink to files? When batch job will sink to file, we will add an executionAttemptID suffix to the file name. Finally in finalizeOnMaster() I will delete or rename them. Here we should pay attention to the situation of flink stream job processing bounded data sets. (8)In batch job with all-to-all shuffle, how do we let the downstream original execution and speculative execution know which ResultSubPartition to read of upstream task? Two executions of a upstream ExecutionVertex will produce two ResultPartition。When edge is blocking between upstream and down stream. When upstream ExecutionVertex have finished we will update the input channel of down stream execution to the fastest finished execution of upstream. Here we should pay attention to the situation that the down stream execution when meet DataConsumptionException. It will restarts with the upstream execution that has been finished. Here we should pay attention to the situation of flink stream job processing bounded data sets. (9)How to display information about speculative task on the Flink web ui. After I have implemented this feature. When speculative execution runs faster then original execution, the flink ui will show that this task has been cancelled. But the result of the job is correct, which is in full compliance with our expectations. I don’t know much about the web,I will ask my colleague for help. [~trohrmann] I am very interested in this issue, and my implementation has played a big role in our product cluster in Alibaba。 Happy to discuss it, and could you assign this issue to me? > Batch Job: Speculative execution > -------------------------------- > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination > Reporter: JIN SUN > Assignee: BoWang > Priority: Major > Labels: stale-assigned > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian Jira (v8.3.4#803005)