回复:[DISCUSS] Proposal of external shuffle service

2018-09-11 Thread Zhijiang(wangzhijiang999)
Many thanks Till!

I would create a JIRA for this feature and design a document attched with it. 
I will let you know after ready! :)

Best,
Zhijiang


--
发件人:Till Rohrmann 
发送时间:2018年9月7日(星期五) 22:01
收件人:Zhijiang(wangzhijiang999) 
抄 送:dev 
主 题:Re: [DISCUSS] Proposal of external shuffle service

The rough plan sounds good Zhijiang. I think we should continue with what
you've proposed: Open a JIRA issue and creating a design document which
outlines the required changes a little bit more in detail. Once this is
done, we should link the design document in the JIRA issue and post it here
for further discussion.

Cheers,
Till

On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Glad to receive your positive feedbacks Till!
>
> Actually our motivation is to support batch job well as you mentioned.
>
> For output level, flink already has the Subpartition abstraction(writer),
> and currently there are PipelinedSubpartition(memory output) and
> SpillableSubpartition(one-sp-one-file output) implementations. We can
> extend this abstraction to realize other persistent outputs (e.g.
> sort-merge-file).
>
> For transport level(shuffle service), the current SubpartitionView
> abstraction(reader) seems as the brige linked with the output level, then
> the view can understand and read the different output formats. The current
> NetworkEnvironment seems take the role of internal shuffle service in
> TaskManager and the transport server is realized by netty inside. This
> component can also be started in other external containers like NodeManager
> of yarn to take the role of external shuffle service. Further we can
> abstract to extend the shuffle service for transporting outputs by http or
> rdma instead of current netty.  This abstraction should provide the way for
> output registration in order to read the results correctly, similar with
> current SubpartitionView.
>
> The above is still a rough idea. Next I plan to create a feature jira to
> cover the related changes if possible. It would be better if getting help
> from related committers to review the detail designs together.
>
> Best,
> Zhijiang
>
> --
> 发件人:Till Rohrmann 
> 发送时间:2018年8月29日(星期三) 17:36
> 收件人:dev ; Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
>
> Thanks for starting this design discussion Zhijiang!
>
> I really like the idea to introduce a ShuffleService abstraction which
> allows to have different implementations depending on the actual use case.
> Especially for batch jobs I can clearly see the benefits of persisting the
> results somewhere else.
>
> Do you already know which interfaces we need to extend and where to
> introduce new abstractions?
>
> Cheers,
> Till
>
> On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
>  wrote:
> Hi all!
>
> The shuffle service is responsible for transporting upstream produced data
> to the downstream side. In flink, the NettyServer is used for network
> transport service and this component is started in the TaskManager process.
> That means the TaskManager can support internal shuffle service which
> exists some concerns:
> 1. If a task finishes, the ResultPartition of this task still retains
> registered in TaskManager, because the output buffers have to be
> transported by internal shuffle service in TaskManager. That means the
> TaskManager can not be released by ResourceManager until ResultPartition
> released. It may waste container resources and can not support well for
> dynamic resource scenarios.
> 2. If we want to expand another shuffle service implementation, the
> current mechanism is not easy to handle, because the output level (result
> partition) and transport level (shuffle service) are not divided clearly
> and loss of abstraction to be extended.
>
> For above considerations, we propose the external shuffle service which
> can be deployed on any other external contaienrs, e.g. NodeManager
> container in yarn. Then the TaskManager can be released ASAP ifneeded when
> all the internal tasks finished. The persistent output files of these
> finished tasks can be served to transport by external shuffle service in
> the same machine.
>
> Further we can abstract both of the output level and transport level to
> support different implementations. e.g. We realized merging the data of all
> the subpartitions into limited persistent local files for disk improvements
> in some scenarios instead of one-subpartiton-one-file.
>
> I know it may be a big work for doing this, and I just point out some
> ideas, and wish getting any feedbacks from you!
>
> Best,
> Zhijiang
>
>
>



Re: Elasticsearch InputFormat

2018-09-11 Thread Michael Gendelman
Hi Till,

Thanks for the great suggestion!
Seems like it does the job. Here is a sample of the code:

public class FlinkMain {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

EsInputFormat kvEsInputFormat = new
EsInputFormat<> ();
HadoopInputFormat hadoopInputFormat =
new HadoopInputFormat<>(kvEsInputFormat, Text.class,
LinkedMapWritable.class);

Configuration configuration = hadoopInputFormat.getConfiguration();
configuration.set("es.resource", "flink-1/flink_t");
configuration.set("es.query", "?q=*");

DataSet> input =
env.createInput(hadoopInputFormat);

List> collect = input.collect();
collect.forEach(e -> System.out.println(e));
}
}


On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann  wrote:

> Hi Michael,
>
> have you considered trying out the EsInputFormat [1] with
> Flink's HadoopInputFormatBase? That way reading from ElasticSearch might
> already work out of the box. If not, then adding a dedicated ElasticSearch
> input format would definitely be helpful.
>
> [1] https://github.com/elastic/elasticsearch-hadoop
>
> On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman 
> wrote:
>
> > Hi all,
> >
> > I have a workload where I need to read and transform large amounts of
> data
> > from Elasticsearch. I'm currently using Flink only for streaming but I
> > though that it can also be a good fit for this kind of batch job.
> > However, I did not find a way to load data from Elasticsearch in parallel
> > to Flink.
> >
> > I'd like to propose *ElasticsearchInputFormat* which will be able to load
> > data from Elasticsearch in parallel by leveraging the InputSplit
> mechanism
> > in Flink and the Elasticsearch scroll API.
> >
> > The API should look something like this:
> > ElasticsearchInputFormat elasticsearchInputFormat =
> > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
> > .setParametersProvider(paramsProvider)
> > .setIndex("index-name")
> > .setClusterName("type-name")
> > .build();
> > DataSet input = env.createInput(elasticsearchInputFormat);
> >
> > The '*query' *is a regular ES query specifying the data to fetch.
> > The '*esMapper*' maps JSON data returned from Elasticsearch to some
> object
> > (In the example above *MessageObj*)
> > In order for it to work in parallel the InputFormat will work with an
> > InputSplit which will get parameters on how to split a certain range
> using
> > the '*paramsProvider'.*
> >
> > What do you think?
> >
> > Best,
> > Michael.
> >
>


Re: Elasticsearch InputFormat

2018-09-11 Thread Till Rohrmann
Great to hear that it works :-)

On Tue, Sep 11, 2018 at 9:28 AM Michael Gendelman  wrote:

> Hi Till,
>
> Thanks for the great suggestion!
> Seems like it does the job. Here is a sample of the code:
>
> public class FlinkMain {
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> EsInputFormat kvEsInputFormat = new
> EsInputFormat<> ();
> HadoopInputFormat hadoopInputFormat =
> new HadoopInputFormat<>(kvEsInputFormat, Text.class,
> LinkedMapWritable.class);
>
> Configuration configuration = hadoopInputFormat.getConfiguration();
> configuration.set("es.resource", "flink-1/flink_t");
> configuration.set("es.query", "?q=*");
>
> DataSet> input =
> env.createInput(hadoopInputFormat);
>
> List> collect = input.collect();
> collect.forEach(e -> System.out.println(e));
> }
> }
>
>
> On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann 
> wrote:
>
> > Hi Michael,
> >
> > have you considered trying out the EsInputFormat [1] with
> > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might
> > already work out of the box. If not, then adding a dedicated
> ElasticSearch
> > input format would definitely be helpful.
> >
> > [1] https://github.com/elastic/elasticsearch-hadoop
> >
> > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman 
> > wrote:
> >
> > > Hi all,
> > >
> > > I have a workload where I need to read and transform large amounts of
> > data
> > > from Elasticsearch. I'm currently using Flink only for streaming but I
> > > though that it can also be a good fit for this kind of batch job.
> > > However, I did not find a way to load data from Elasticsearch in
> parallel
> > > to Flink.
> > >
> > > I'd like to propose *ElasticsearchInputFormat* which will be able to
> load
> > > data from Elasticsearch in parallel by leveraging the InputSplit
> > mechanism
> > > in Flink and the Elasticsearch scroll API.
> > >
> > > The API should look something like this:
> > > ElasticsearchInputFormat elasticsearchInputFormat =
> > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
> > > .setParametersProvider(paramsProvider)
> > > .setIndex("index-name")
> > > .setClusterName("type-name")
> > > .build();
> > > DataSet input = env.createInput(elasticsearchInputFormat);
> > >
> > > The '*query' *is a regular ES query specifying the data to fetch.
> > > The '*esMapper*' maps JSON data returned from Elasticsearch to some
> > object
> > > (In the example above *MessageObj*)
> > > In order for it to work in parallel the InputFormat will work with an
> > > InputSplit which will get parameters on how to split a certain range
> > using
> > > the '*paramsProvider'.*
> > >
> > > What do you think?
> > >
> > > Best,
> > > Michael.
> > >
> >
>


[jira] [Created] (FLINK-10317) Configure Metaspace size by default

2018-09-11 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-10317:


 Summary: Configure Metaspace size by default
 Key: FLINK-10317
 URL: https://issues.apache.org/jira/browse/FLINK-10317
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Affects Versions: 1.5.3
Reporter: Stephan Ewen


We should set the size of the JVM Metaspace to a sane default, like  
{{-XX:MaxMetaspaceSize=256m}}.

If not set, the JVM offheap memory will grow indefinitely with repeated 
classloading and Jitting, eventually exceeding allowed memory on docker/yarn or 
similar setups.

It is hard to come up with a good default, however, I believe the error 
messages one gets when metaspace is too small are easy to understand (and easy 
to take action), while it is very hard to figure out why the memory footprint 
keeps growing steadily and infinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10318) Add option to build Hadoop-free job image to build.sh

2018-09-11 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-10318:
---

 Summary: Add option to build Hadoop-free job image to build.sh
 Key: FLINK-10318
 URL: https://issues.apache.org/jira/browse/FLINK-10318
 Project: Flink
  Issue Type: Improvement
  Components: Docker
Affects Versions: 1.6.0
Reporter: Ufuk Celebi


When building a Job-specific image from a release via 
{{flink-container/docker/build.sh}}, we require to specify a Hadoop version:
{code}
./build.sh
  --job-jar flink-job.jar
  --from-release
  --flink-version 1.6.0
  --hadoop-version 2.8 # <- currently required
  --scala-version 2.11
  --image-name flink-job
{code}

I think for many users a Hadoop-free build is a good default. We should 
consider supporting this out of the box with the {{build.sh}} script.

The current work around would be to manually download the Hadoop-free release 
and build with the {{--from-archive}} flag.

Another alternative would be to drop the {{from-release}} option and document 
how to build from an archive with links to the downloads.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Speakers needed for Apache DC Roadshow

2018-09-11 Thread Rich Bowen
We need your help to make the Apache Washington DC Roadshow on Dec 4th a 
success.


What do we need most? Speakers!

We're bringing a unique DC flavor to this event by mixing Open Source 
Software with talks about Apache projects as well as OSS CyberSecurity, 
OSS in Government and and OSS Career advice.


Please take a look at: http://www.apachecon.com/usroadshow18/

(Note: You are receiving this message because you are subscribed to one 
or more mailing lists at The Apache Software Foundation.)


Rich, for the ApacheCon Planners

--
rbo...@apache.org
http://apachecon.com
@ApacheCon


[jira] [Created] (FLINK-10319) Avoid requestPartitionState from JM but always try retrigger

2018-09-11 Thread JIRA
陈梓立 created FLINK-10319:
---

 Summary: Avoid requestPartitionState from JM but always try 
retrigger
 Key: FLINK-10319
 URL: https://issues.apache.org/jira/browse/FLINK-10319
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


Do not requestPartitionState from JM on partition request fail, which may 
generate too many RPC requests and block JM.

We gain little benefit to check what state producer is in, which in the other 
hand crash JM by too many RPC requests. Task could always 
retriggerPartitionRequest from its InputGate, it would be fail if the producer 
has gone and succeed if the producer alive. Anyway, no need to ask for JM for 
help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10320) Introduce JobMaster schedule micro-benchmark

2018-09-11 Thread JIRA
陈梓立 created FLINK-10320:
---

 Summary: Introduce JobMaster schedule micro-benchmark
 Key: FLINK-10320
 URL: https://issues.apache.org/jira/browse/FLINK-10320
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: 陈梓立
Assignee: 陈梓立


Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the repo 
[flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I proposal 
to introduce another micro-benchmark which focuses on {{JobMaster}} schedule 
performance

h3. Target
Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and 
init) to all tasks RUNNING. Technically we use bounded stream and TM finishes 
tasks as soon as they arrived. So the real interval we measure is to all tasks 
FINISHED.

h3. Case
1. JobGraph that cover EAGER + PIPELINED edges
2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges
3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges
ps: maybe benchmark if the source is get from {{InputSplit}}?

h3. Implement
Based on the flink-benchmark repo, we finally run benchmark using jmh. So the 
whole test suit is separated into two repos. The testing environment could be 
located in the main repo, maybe under 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark.
To measure the performance of {{JobMaster}} scheduling, we need to simulate an 
environment that:
1. has a real {{JobMaster}}
2. has a mock/testing {{ResourceManager}} that having infinite resource and 
react immediately.
3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks 
immediately.

[~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this 
proposal to help clarify the goal and concrete details? Thanks in advance.

Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10321) Make the condition of broadcast partitioner simple

2018-09-11 Thread zhijiang (JIRA)
zhijiang created FLINK-10321:


 Summary: Make the condition of broadcast partitioner simple
 Key: FLINK-10321
 URL: https://issues.apache.org/jira/browse/FLINK-10321
 Project: Flink
  Issue Type: Improvement
  Components: Network
Affects Versions: 1.7.0
Reporter: zhijiang
Assignee: zhijiang


The current  {{BroadcastPartitioner}} uses the vars of {{set}} and 
{{setNumber}} as the condition for returning channel arrays.

Instead of using {{set}} and {{setNumber}}, we can just check whether 
{{returnChannel.length == numberOfOutputChannels}} as the condition to make it 
simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager

2018-09-11 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10322:
---

 Summary: Unused instance variable MetricRegistry in ResourceManager
 Key: FLINK-10322
 URL: https://issues.apache.org/jira/browse/FLINK-10322
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Reporter: Shimin Yang
Assignee: Shimin Yang


Same as the title.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jar availability after cluster restart

2018-09-11 Thread Ricardo.Costa
Hi all,

Just trying to make sure I haven't missed anything - after a cluster restart 
all the uploaded jars are not available from the dashboard or REST API. So in 
case we need to resubmit a new job from an already uploaded jar after a cluster 
restart, it is my understanding that one needs to re-upload the jar first, is 
that correct?

Thanks in advance,
Ricardo


This communication and any attachments are confidential and intended solely for 
the addressee. If you are not the intended recipient please advise us 
immediately and delete it. Unless specifically stated in the message or 
otherwise indicated, you may not duplicate, redistribute or forward this 
message and any attachments are not intended for distribution to, or use by any 
person or entity in any jurisdiction or country where such distribution or use 
would be contrary to local law or regulation. NatWest Markets Plc  or any 
affiliated entity ("NatWest Markets") accepts no responsibility for any changes 
made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and 
its attachments are for information purposes only and should not be regarded as 
an offer or solicitation to buy or sell a product or service, confirmation of 
any transaction, a valuation, indicative price or an official statement. This 
communication has been prepared by the NatWest Markets trading desk, which may 
have a position or interest in the products or services mentioned that is 
inconsistent with any views expressed in this message. In evaluating the 
information contained in this message, you should know that it could have been 
previously provided to other clients and/or internal NatWest Markets personnel, 
who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic 
communications (sent or received) are secure, error free, not corrupted, 
incomplete or virus free and/or that they will not be lost, mis-delivered, 
destroyed, delayed or intercepted/decrypted by others. Therefore NatWest 
Markets disclaims all liability with regards to electronic communications (and 
the contents therein) if they are corrupted, lost destroyed, delayed, 
incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated 
by others.

Any electronic communication that is conducted within or through NatWest 
Markets systems will be subject to being archived, monitored and produced to 
regulators and in litigation in accordance with NatWest Markets’ policy and 
local laws, rules and regulations. Unless expressly prohibited by local law, 
electronic communications may be archived in countries other than the country 
in which you are located, and may be treated in accordance with the laws and 
regulations of the country of each individual included in the entire chain.

Copyright 2013-2018 The Royal Bank of Scotland Group plc. All rights reserved. 
See http://www.natwestmarkets.com/legal/s-t-discl.html for further risk 
disclosure.