回复:[DISCUSS] Proposal of external shuffle service
Many thanks Till! I would create a JIRA for this feature and design a document attched with it. I will let you know after ready! :) Best, Zhijiang -- 发件人:Till Rohrmann 发送时间:2018年9月7日(星期五) 22:01 收件人:Zhijiang(wangzhijiang999) 抄 送:dev 主 题:Re: [DISCUSS] Proposal of external shuffle service The rough plan sounds good Zhijiang. I think we should continue with what you've proposed: Open a JIRA issue and creating a design document which outlines the required changes a little bit more in detail. Once this is done, we should link the design document in the JIRA issue and post it here for further discussion. Cheers, Till On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com> wrote: > Glad to receive your positive feedbacks Till! > > Actually our motivation is to support batch job well as you mentioned. > > For output level, flink already has the Subpartition abstraction(writer), > and currently there are PipelinedSubpartition(memory output) and > SpillableSubpartition(one-sp-one-file output) implementations. We can > extend this abstraction to realize other persistent outputs (e.g. > sort-merge-file). > > For transport level(shuffle service), the current SubpartitionView > abstraction(reader) seems as the brige linked with the output level, then > the view can understand and read the different output formats. The current > NetworkEnvironment seems take the role of internal shuffle service in > TaskManager and the transport server is realized by netty inside. This > component can also be started in other external containers like NodeManager > of yarn to take the role of external shuffle service. Further we can > abstract to extend the shuffle service for transporting outputs by http or > rdma instead of current netty. This abstraction should provide the way for > output registration in order to read the results correctly, similar with > current SubpartitionView. > > The above is still a rough idea. Next I plan to create a feature jira to > cover the related changes if possible. It would be better if getting help > from related committers to review the detail designs together. > > Best, > Zhijiang > > -- > 发件人:Till Rohrmann > 发送时间:2018年8月29日(星期三) 17:36 > 收件人:dev ; Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> > 主 题:Re: [DISCUSS] Proposal of external shuffle service > > Thanks for starting this design discussion Zhijiang! > > I really like the idea to introduce a ShuffleService abstraction which > allows to have different implementations depending on the actual use case. > Especially for batch jobs I can clearly see the benefits of persisting the > results somewhere else. > > Do you already know which interfaces we need to extend and where to > introduce new abstractions? > > Cheers, > Till > > On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999) > wrote: > Hi all! > > The shuffle service is responsible for transporting upstream produced data > to the downstream side. In flink, the NettyServer is used for network > transport service and this component is started in the TaskManager process. > That means the TaskManager can support internal shuffle service which > exists some concerns: > 1. If a task finishes, the ResultPartition of this task still retains > registered in TaskManager, because the output buffers have to be > transported by internal shuffle service in TaskManager. That means the > TaskManager can not be released by ResourceManager until ResultPartition > released. It may waste container resources and can not support well for > dynamic resource scenarios. > 2. If we want to expand another shuffle service implementation, the > current mechanism is not easy to handle, because the output level (result > partition) and transport level (shuffle service) are not divided clearly > and loss of abstraction to be extended. > > For above considerations, we propose the external shuffle service which > can be deployed on any other external contaienrs, e.g. NodeManager > container in yarn. Then the TaskManager can be released ASAP ifneeded when > all the internal tasks finished. The persistent output files of these > finished tasks can be served to transport by external shuffle service in > the same machine. > > Further we can abstract both of the output level and transport level to > support different implementations. e.g. We realized merging the data of all > the subpartitions into limited persistent local files for disk improvements > in some scenarios instead of one-subpartiton-one-file. > > I know it may be a big work for doing this, and I just point out some > ideas, and wish getting any feedbacks from you! > > Best, > Zhijiang > > >
Re: Elasticsearch InputFormat
Hi Till, Thanks for the great suggestion! Seems like it does the job. Here is a sample of the code: public class FlinkMain { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); EsInputFormat kvEsInputFormat = new EsInputFormat<> (); HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(kvEsInputFormat, Text.class, LinkedMapWritable.class); Configuration configuration = hadoopInputFormat.getConfiguration(); configuration.set("es.resource", "flink-1/flink_t"); configuration.set("es.query", "?q=*"); DataSet> input = env.createInput(hadoopInputFormat); List> collect = input.collect(); collect.forEach(e -> System.out.println(e)); } } On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann wrote: > Hi Michael, > > have you considered trying out the EsInputFormat [1] with > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might > already work out of the box. If not, then adding a dedicated ElasticSearch > input format would definitely be helpful. > > [1] https://github.com/elastic/elasticsearch-hadoop > > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman > wrote: > > > Hi all, > > > > I have a workload where I need to read and transform large amounts of > data > > from Elasticsearch. I'm currently using Flink only for streaming but I > > though that it can also be a good fit for this kind of batch job. > > However, I did not find a way to load data from Elasticsearch in parallel > > to Flink. > > > > I'd like to propose *ElasticsearchInputFormat* which will be able to load > > data from Elasticsearch in parallel by leveraging the InputSplit > mechanism > > in Flink and the Elasticsearch scroll API. > > > > The API should look something like this: > > ElasticsearchInputFormat elasticsearchInputFormat = > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > > .setParametersProvider(paramsProvider) > > .setIndex("index-name") > > .setClusterName("type-name") > > .build(); > > DataSet input = env.createInput(elasticsearchInputFormat); > > > > The '*query' *is a regular ES query specifying the data to fetch. > > The '*esMapper*' maps JSON data returned from Elasticsearch to some > object > > (In the example above *MessageObj*) > > In order for it to work in parallel the InputFormat will work with an > > InputSplit which will get parameters on how to split a certain range > using > > the '*paramsProvider'.* > > > > What do you think? > > > > Best, > > Michael. > > >
Re: Elasticsearch InputFormat
Great to hear that it works :-) On Tue, Sep 11, 2018 at 9:28 AM Michael Gendelman wrote: > Hi Till, > > Thanks for the great suggestion! > Seems like it does the job. Here is a sample of the code: > > public class FlinkMain { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > EsInputFormat kvEsInputFormat = new > EsInputFormat<> (); > HadoopInputFormat hadoopInputFormat = > new HadoopInputFormat<>(kvEsInputFormat, Text.class, > LinkedMapWritable.class); > > Configuration configuration = hadoopInputFormat.getConfiguration(); > configuration.set("es.resource", "flink-1/flink_t"); > configuration.set("es.query", "?q=*"); > > DataSet> input = > env.createInput(hadoopInputFormat); > > List> collect = input.collect(); > collect.forEach(e -> System.out.println(e)); > } > } > > > On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann > wrote: > > > Hi Michael, > > > > have you considered trying out the EsInputFormat [1] with > > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might > > already work out of the box. If not, then adding a dedicated > ElasticSearch > > input format would definitely be helpful. > > > > [1] https://github.com/elastic/elasticsearch-hadoop > > > > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman > > wrote: > > > > > Hi all, > > > > > > I have a workload where I need to read and transform large amounts of > > data > > > from Elasticsearch. I'm currently using Flink only for streaming but I > > > though that it can also be a good fit for this kind of batch job. > > > However, I did not find a way to load data from Elasticsearch in > parallel > > > to Flink. > > > > > > I'd like to propose *ElasticsearchInputFormat* which will be able to > load > > > data from Elasticsearch in parallel by leveraging the InputSplit > > mechanism > > > in Flink and the Elasticsearch scroll API. > > > > > > The API should look something like this: > > > ElasticsearchInputFormat elasticsearchInputFormat = > > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > > > .setParametersProvider(paramsProvider) > > > .setIndex("index-name") > > > .setClusterName("type-name") > > > .build(); > > > DataSet input = env.createInput(elasticsearchInputFormat); > > > > > > The '*query' *is a regular ES query specifying the data to fetch. > > > The '*esMapper*' maps JSON data returned from Elasticsearch to some > > object > > > (In the example above *MessageObj*) > > > In order for it to work in parallel the InputFormat will work with an > > > InputSplit which will get parameters on how to split a certain range > > using > > > the '*paramsProvider'.* > > > > > > What do you think? > > > > > > Best, > > > Michael. > > > > > >
[jira] [Created] (FLINK-10317) Configure Metaspace size by default
Stephan Ewen created FLINK-10317: Summary: Configure Metaspace size by default Key: FLINK-10317 URL: https://issues.apache.org/jira/browse/FLINK-10317 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Affects Versions: 1.5.3 Reporter: Stephan Ewen We should set the size of the JVM Metaspace to a sane default, like {{-XX:MaxMetaspaceSize=256m}}. If not set, the JVM offheap memory will grow indefinitely with repeated classloading and Jitting, eventually exceeding allowed memory on docker/yarn or similar setups. It is hard to come up with a good default, however, I believe the error messages one gets when metaspace is too small are easy to understand (and easy to take action), while it is very hard to figure out why the memory footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10318) Add option to build Hadoop-free job image to build.sh
Ufuk Celebi created FLINK-10318: --- Summary: Add option to build Hadoop-free job image to build.sh Key: FLINK-10318 URL: https://issues.apache.org/jira/browse/FLINK-10318 Project: Flink Issue Type: Improvement Components: Docker Affects Versions: 1.6.0 Reporter: Ufuk Celebi When building a Job-specific image from a release via {{flink-container/docker/build.sh}}, we require to specify a Hadoop version: {code} ./build.sh --job-jar flink-job.jar --from-release --flink-version 1.6.0 --hadoop-version 2.8 # <- currently required --scala-version 2.11 --image-name flink-job {code} I think for many users a Hadoop-free build is a good default. We should consider supporting this out of the box with the {{build.sh}} script. The current work around would be to manually download the Hadoop-free release and build with the {{--from-archive}} flag. Another alternative would be to drop the {{from-release}} option and document how to build from an archive with links to the downloads. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Speakers needed for Apache DC Roadshow
We need your help to make the Apache Washington DC Roadshow on Dec 4th a success. What do we need most? Speakers! We're bringing a unique DC flavor to this event by mixing Open Source Software with talks about Apache projects as well as OSS CyberSecurity, OSS in Government and and OSS Career advice. Please take a look at: http://www.apachecon.com/usroadshow18/ (Note: You are receiving this message because you are subscribed to one or more mailing lists at The Apache Software Foundation.) Rich, for the ApacheCon Planners -- rbo...@apache.org http://apachecon.com @ApacheCon
[jira] [Created] (FLINK-10319) Avoid requestPartitionState from JM but always try retrigger
陈梓立 created FLINK-10319: --- Summary: Avoid requestPartitionState from JM but always try retrigger Key: FLINK-10319 URL: https://issues.apache.org/jira/browse/FLINK-10319 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 Do not requestPartitionState from JM on partition request fail, which may generate too many RPC requests and block JM. We gain little benefit to check what state producer is in, which in the other hand crash JM by too many RPC requests. Task could always retriggerPartitionRequest from its InputGate, it would be fail if the producer has gone and succeed if the producer alive. Anyway, no need to ask for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10320) Introduce JobMaster schedule micro-benchmark
陈梓立 created FLINK-10320: --- Summary: Introduce JobMaster schedule micro-benchmark Key: FLINK-10320 URL: https://issues.apache.org/jira/browse/FLINK-10320 Project: Flink Issue Type: Improvement Components: Tests Reporter: 陈梓立 Assignee: 陈梓立 Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the repo [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I proposal to introduce another micro-benchmark which focuses on {{JobMaster}} schedule performance h3. Target Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and init) to all tasks RUNNING. Technically we use bounded stream and TM finishes tasks as soon as they arrived. So the real interval we measure is to all tasks FINISHED. h3. Case 1. JobGraph that cover EAGER + PIPELINED edges 2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges 3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges ps: maybe benchmark if the source is get from {{InputSplit}}? h3. Implement Based on the flink-benchmark repo, we finally run benchmark using jmh. So the whole test suit is separated into two repos. The testing environment could be located in the main repo, maybe under flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark. To measure the performance of {{JobMaster}} scheduling, we need to simulate an environment that: 1. has a real {{JobMaster}} 2. has a mock/testing {{ResourceManager}} that having infinite resource and react immediately. 3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks immediately. [~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this proposal to help clarify the goal and concrete details? Thanks in advance. Any suggestions are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10321) Make the condition of broadcast partitioner simple
zhijiang created FLINK-10321: Summary: Make the condition of broadcast partitioner simple Key: FLINK-10321 URL: https://issues.apache.org/jira/browse/FLINK-10321 Project: Flink Issue Type: Improvement Components: Network Affects Versions: 1.7.0 Reporter: zhijiang Assignee: zhijiang The current {{BroadcastPartitioner}} uses the vars of {{set}} and {{setNumber}} as the condition for returning channel arrays. Instead of using {{set}} and {{setNumber}}, we can just check whether {{returnChannel.length == numberOfOutputChannels}} as the condition to make it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager
Shimin Yang created FLINK-10322: --- Summary: Unused instance variable MetricRegistry in ResourceManager Key: FLINK-10322 URL: https://issues.apache.org/jira/browse/FLINK-10322 Project: Flink Issue Type: Improvement Components: ResourceManager Reporter: Shimin Yang Assignee: Shimin Yang Same as the title. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Jar availability after cluster restart
Hi all, Just trying to make sure I haven't missed anything - after a cluster restart all the uploaded jars are not available from the dashboard or REST API. So in case we need to resubmit a new job from an already uploaded jar after a cluster restart, it is my understanding that one needs to re-upload the jar first, is that correct? Thanks in advance, Ricardo This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent. Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. This communication has been prepared by the NatWest Markets trading desk, which may have a position or interest in the products or services mentioned that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it. NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others. Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain. Copyright 2013-2018 The Royal Bank of Scotland Group plc. All rights reserved. See http://www.natwestmarkets.com/legal/s-t-discl.html for further risk disclosure.