Re: finite subset of an infinite data stream

2015-11-20 Thread rss rss
Hello Aljoscha,

  very thanks. I tried to build your example but have an obstacle with
org.apache.flink.runtime.state.AbstractStateBackend class. Where to get it?
I guess it stored in your local branch only. Would you please to send me
patches for public branch or share the branch with me?

Best regards,
Roman


2015-11-18 17:24 GMT+04:00 Aljoscha Krettek :

> Hi,
> I wrote a little example that could be what you are looking for:
> https://github.com/dataArtisans/query-window-example
>
> It basically implements a window operator with a modifiable window size
> that also allows querying the current accumulated window contents using a
> second input stream.
>
> There is a README file in the github repository, but please let me know if
> you need further explanations.
>
> Cheers,
> Aljoscha
>
> > On 18 Nov 2015, at 12:02, Robert Metzger  wrote:
> >
> > Hi Roman,
> >
> > I've updated the documentation. It seems that it got out of sync. Thank
> you for notifying us about this.
> >
> > My colleague Aljoscha has some experimental code that is probably doing
> what you are looking for: A standing window (your RT-buffer) that you can
> query using a secondary stream (your user's queries).
> > He'll post the code soon to this email thread.
> >
> > Regards,
> > Robert
> >
> >
> > On Wed, Nov 11, 2015 at 2:51 PM, rss rss  wrote:
> > Hello,
> >
> >   thanks, Stephan, but triggers are not that I searched. And BTW, the
> documentation is obsolete. There is no Count class now. I found
> CountTrigger only.
> >
> >   Thanks Robert, your example may be useful for me but in some other
> point. I mentioned "union" as an ordinary union of similar data. It is the
> same as "union" in the datastream documentation.
> >
> >   The task is very simple. We have an infinite stream of data from
> sensors, billing system etc. There is no matter what it is but it is
> infinite. We have to store the data in any persistent storage to be able to
> make analytical queries later. And there is a stream of user's analytical
> queries. But the stream of input data is big and time of saving in the
> persistent storage is big too. And we have not a very fast bigdata OLTP
> storage. That is the data extracted from the persistent storage by the
> user's requests probably will not contain actual data. We have to have some
> real time buffer (RT-Buffer in the schema) with actual data and have to
> union it with the data processing results from persistent storage (I don't
> speak about data deduplication and ordering now.). And of course the user's
> query are unpredictable regarding data filtering conditions.
> >
> >   The attached schema is attempt to understand how it may be implemented
> with Flink. I tried to imagine how to implement it by Flink's streaming API
> but found obstacles. This schema is not first variant. It contains
> separated driver program to configure new jobs by user's queries. The
> reason I not found a way how to link the stream of user's queries with
> further data processing. But it is some near to
> https://gist.github.com/fhueske/4ea5422edb5820915fa4
> >
> >
> > 
> >
> >   The main question is how to process each user's query combining it
> with actual data from the real time buffer and batch request to the
> persistent storage. Unfortunately I not found a decision in Streaming API
> only.
> >
> > Regards,
> > Roman
> >
> > 2015-11-11 15:45 GMT+04:00 Robert Metzger :
> > I think what you call "union" is a "connected stream" in Flink. Have a
> look at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4
> > It shows how to dynamically update a list of filters by external
> requests.
> > Maybe that's what you are looking for?
> >
> >
> >
> > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen  wrote:
> > Hi!
> >
> > I don not really understand what exactly you want to do, especially the
> "union an infinite real time data stream with filtered persistent data
> where the condition of filtering is provided by external requests".
> >
> > If you want to work on substreams in general, there are two options:
> >
> > 1) Create the substream in a streaming window. You can "cut" the stream
> based on special records/events that signal that the subsequence is done.
> Have a look at the "Trigger" class for windows, it can react to elements
> and their contents:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
> (secion on Advanced Windowing).
> >
> >
> > 2) You can trigger sequences of batch jobs. The batch job data source
> (input format) can decide when to stop consuming the stream, at which point
> the remainder of the transformations run, and the batch job finishes.
> > You can already run new transformation chains after each call to
> "env.execute()", once the execution finished, to implement the sequence of
> batch jobs.
> >
> >
> > I would try and go for the windowing solution if that works, because
> that will give you better fault tolerance / high 

Re: finite subset of an infinite data stream

2015-11-20 Thread Aljoscha Krettek
Hi,
I’m very sorry, yes you would need my custom branch: 
https://github.com/aljoscha/flink/commits/state-enhance

Cheers,
Aljoscha
> On 20 Nov 2015, at 10:13, rss rss  wrote:
> 
> Hello Aljoscha,
> 
>   very thanks. I tried to build your example but have an obstacle with 
> org.apache.flink.runtime.state.AbstractStateBackend class. Where to get it? I 
> guess it stored in your local branch only. Would you please to send me 
> patches for public branch or share the branch with me?
> 
> Best regards,
> Roman
> 
> 
> 2015-11-18 17:24 GMT+04:00 Aljoscha Krettek :
> Hi,
> I wrote a little example that could be what you are looking for: 
> https://github.com/dataArtisans/query-window-example
> 
> It basically implements a window operator with a modifiable window size that 
> also allows querying the current accumulated window contents using a second 
> input stream.
> 
> There is a README file in the github repository, but please let me know if 
> you need further explanations.
> 
> Cheers,
> Aljoscha
> 
> > On 18 Nov 2015, at 12:02, Robert Metzger  wrote:
> >
> > Hi Roman,
> >
> > I've updated the documentation. It seems that it got out of sync. Thank you 
> > for notifying us about this.
> >
> > My colleague Aljoscha has some experimental code that is probably doing 
> > what you are looking for: A standing window (your RT-buffer) that you can 
> > query using a secondary stream (your user's queries).
> > He'll post the code soon to this email thread.
> >
> > Regards,
> > Robert
> >
> >
> > On Wed, Nov 11, 2015 at 2:51 PM, rss rss  wrote:
> > Hello,
> >
> >   thanks, Stephan, but triggers are not that I searched. And BTW, the 
> > documentation is obsolete. There is no Count class now. I found 
> > CountTrigger only.
> >
> >   Thanks Robert, your example may be useful for me but in some other point. 
> > I mentioned "union" as an ordinary union of similar data. It is the same as 
> > "union" in the datastream documentation.
> >
> >   The task is very simple. We have an infinite stream of data from sensors, 
> > billing system etc. There is no matter what it is but it is infinite. We 
> > have to store the data in any persistent storage to be able to make 
> > analytical queries later. And there is a stream of user's analytical 
> > queries. But the stream of input data is big and time of saving in the 
> > persistent storage is big too. And we have not a very fast bigdata OLTP 
> > storage. That is the data extracted from the persistent storage by the 
> > user's requests probably will not contain actual data. We have to have some 
> > real time buffer (RT-Buffer in the schema) with actual data and have to 
> > union it with the data processing results from persistent storage (I don't 
> > speak about data deduplication and ordering now.). And of course the user's 
> > query are unpredictable regarding data filtering conditions.
> >
> >   The attached schema is attempt to understand how it may be implemented 
> > with Flink. I tried to imagine how to implement it by Flink's streaming API 
> > but found obstacles. This schema is not first variant. It contains 
> > separated driver program to configure new jobs by user's queries. The 
> > reason I not found a way how to link the stream of user's queries with 
> > further data processing. But it is some near to 
> > https://gist.github.com/fhueske/4ea5422edb5820915fa4
> >
> >
> > 
> >
> >   The main question is how to process each user's query combining it with 
> > actual data from the real time buffer and batch request to the persistent 
> > storage. Unfortunately I not found a decision in Streaming API only.
> >
> > Regards,
> > Roman
> >
> > 2015-11-11 15:45 GMT+04:00 Robert Metzger :
> > I think what you call "union" is a "connected stream" in Flink. Have a look 
> > at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4
> > It shows how to dynamically update a list of filters by external requests.
> > Maybe that's what you are looking for?
> >
> >
> >
> > On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen  wrote:
> > Hi!
> >
> > I don not really understand what exactly you want to do, especially the 
> > "union an infinite real time data stream with filtered persistent data 
> > where the condition of filtering is provided by external requests".
> >
> > If you want to work on substreams in general, there are two options:
> >
> > 1) Create the substream in a streaming window. You can "cut" the stream 
> > based on special records/events that signal that the subsequence is done. 
> > Have a look at the "Trigger" class for windows, it can react to elements 
> > and their contents:
> >
> > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
> >  (secion on Advanced Windowing).
> >
> >
> > 2) You can trigger sequences of batch jobs. The batch job data source 
> > (input format) can decide when to stop consuming the stream, at which point 
> > the remainder of the transformations run, and the 

Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi,

I am currently interested in experimenting on Flink over Hadoop YARN.
I am documenting from the documentation we have here: 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
 


There is a subsection Start Flink Session which states the following: A session 
will start all required Flink services (JobManager and TaskManagers) so that 
you can submit programs to the cluster. Note that you can run multiple programs 
per session.

Can you be more precise regarding the multiple programs per session? If I 
submit multiple programs concurently what will happen (can I?)? Maybe they will 
run in a FIFO fashion or what should I expect?

The internals section specify that users can execute multiple Flink Yarn 
sessions in parallel. This is great, this invites to static partitioning of 
resources in order to run multiple applications concurrently. Do you support a 
fair scheduler similar to what Spark claims it has?  

There is FAQ section 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
) 
resource that is missing, can this be updated?

Thank you.

Best regards,
Ovidiu
 

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Robert Metzger
Hi Ovidiu,

you can submit multiple programs to a running Flink cluster (or a YARN
session). Flink does currently not have any queuing mechanism.
The JobManager will reject a program if there are not enough free resources
for it. If there are enough resources for multiple programs, they'll run
concurrently.
Note that Flink is not starting separate JVMs for the programs, so if one
program is doing a System.exit(0), it is killing the entire JVM, including
other running programs.

You can start as many YARN sessions (or single jobs to YARN) as you have
resources available on the cluster. The resource allocation is up to the
scheduler you've configured in YARN.

In general, we recommend to start a YARN session per program. You can also
directly submit a Flink program to YARN.

Where did you find the link to the FAQ? The link on the front page is
working: http://flink.apache.org/faq.html



On Fri, Nov 20, 2015 at 11:41 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi,
>
> I am currently interested in experimenting on Flink over Hadoop YARN.
> I am documenting from the documentation we have here:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>
> There is a subsection *Start Flink Session* which states the following: *A
> session will start all required Flink services (JobManager and
> TaskManagers) so that you can submit programs to the cluster. Note that you
> can run multiple programs per session.*
>
> Can you be more precise regarding the multiple programs per session? If I
> submit multiple programs concurently what will happen (can I?)? Maybe they
> will run in a FIFO fashion or what should I expect?
>
> The internals section specify that users can execute multiple Flink Yarn
> sessions in parallel. This is great, this invites to static partitioning of
> resources in order to run multiple applications concurrently. Do you
> support a fair scheduler similar to what Spark claims it has?
>
> There is FAQ section (
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html)
> resource that is missing, can this be updated?
>
> Thank you.
>
> Best regards,
> Ovidiu
>
>


Re: Compiler Exception

2015-11-20 Thread Till Rohrmann
Hi Kien Truong,

I found a solution to your problem. It's actually a bug in Flink's
optimizer. Thanks for spotting it :-)

I've opened a pull request to fix it (
https://github.com/apache/flink/pull/1388). The fix will also be included
in the upcoming `0.10.1` release. After the pull request has been merged
you can try it out by either checking the current master out and building
Flink yourself or wait until the SNAPSHOT binaries have been updated
(usually over night).

Cheers,
Till

On Thu, Nov 19, 2015 at 2:05 PM, Truong Duc Kien 
wrote:

> Hi Till,
> I have narrowed down a minimal test case, you will need flink-gelly-scala
> package to run this.
>
> import org.apache.flink.api.common.functions.MapFunctionimport 
> org.apache.flink.api.scala._import org.apache.flink.graph._import 
> org.apache.flink.graph.scala.Graphimport 
> org.apache.flink.types.NullValueimport org.apache.flink.util.Collectorobject 
> BulkIterationBug {
>   def main(args: Array[String]): Unit = {
> val environment = ExecutionEnvironment.getExecutionEnvironmentval g = 
> Graph.fromCsvReader[Long, Long, NullValue](
>   pathEdges = "edge.in",
>   vertexValueInitializer = new MapFunction[Long, Long] {
> override def map(t: Long): Long = t  },
>   fieldDelimiterEdges = " ",
>   lineDelimiterEdges = "\n",
>   ignoreCommentsEdges = "%",
>   env = environment
> )
> val vertices = g.getVerticesval edges = g.getEdgesval data = 
> vertices.iterate(1) {
>   (it) => {
> it.coGroup(edges).where(0).equalTo(0) {
>   (first: Iterator[Vertex[Long, Long]],
>second: Iterator[Edge[Long, NullValue]],
>collector: Collector[Vertex[Long, Long]]) => {
> if (first.hasNext) {
>   collector.collect(first.next)
> }
>   }
> }
>   }
> }
> println(data.collect())
>   }
> }
>
> The input file "edge.in" contains only 1 line
>
> 1 2
>
>
> Thanks,
> Kien Truong
>
>
>
> On 11/19/2015 09:36 AM, Till Rohrmann wrote:
>
> Hi Kien Truong,
>
> could you share the problematic code with us?
>
> Cheers,
> Till
> On Nov 18, 2015 9:54 PM, "Truong Duc Kien" 
> wrote:
>
>> Hi,
>>
>> I'm hitting Compiler Exception with some of my data set, but not all of
>> them.
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> No plan meeting the requirements could be created @ Bulk Iteration (Bulk
>> Iteration) (1:null). Most likely reason: Too restrictive plan hints.
>>
>> Can I have some hints on how to troubleshoot this ?
>>
>> Thanks,
>> Kien Truong
>>
>>
>


Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi,

The link to FAQ 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
) is on 
the yarn setup 0.10 documentation page 
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
 
)
 described in this sentence: If you have troubles using the Flink YARN client, 
have a look in the FAQ section 
.

Is the scheduling features considered for next releases?

Thank you.
Best regards,
Ovidiu

> On 20 Nov 2015, at 11:59, Robert Metzger  wrote:
> 
> Hi Ovidiu,
> 
> you can submit multiple programs to a running Flink cluster (or a YARN 
> session). Flink does currently not have any queuing mechanism.
> The JobManager will reject a program if there are not enough free resources 
> for it. If there are enough resources for multiple programs, they'll run 
> concurrently.
> Note that Flink is not starting separate JVMs for the programs, so if one 
> program is doing a System.exit(0), it is killing the entire JVM, including 
> other running programs.
> 
> You can start as many YARN sessions (or single jobs to YARN) as you have 
> resources available on the cluster. The resource allocation is up to the 
> scheduler you've configured in YARN.
> 
> In general, we recommend to start a YARN session per program. You can also 
> directly submit a Flink program to YARN.
> 
> Where did you find the link to the FAQ? The link on the front page is 
> working: http://flink.apache.org/faq.html 
> 
> 
> 
> On Fri, Nov 20, 2015 at 11:41 AM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> I am currently interested in experimenting on Flink over Hadoop YARN.
> I am documenting from the documentation we have here: 
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>  
> 
> 
> There is a subsection Start Flink Session which states the following: A 
> session will start all required Flink services (JobManager and TaskManagers) 
> so that you can submit programs to the cluster. Note that you can run 
> multiple programs per session.
> 
> Can you be more precise regarding the multiple programs per session? If I 
> submit multiple programs concurently what will happen (can I?)? Maybe they 
> will run in a FIFO fashion or what should I expect?
> 
> The internals section specify that users can execute multiple Flink Yarn 
> sessions in parallel. This is great, this invites to static partitioning of 
> resources in order to run multiple applications concurrently. Do you support 
> a fair scheduler similar to what Spark claims it has?  
> 
> There is FAQ section 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
> ) 
> resource that is missing, can this be updated?
> 
> Thank you.
> 
> Best regards,
> Ovidiu
>  
> 



Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Robert Metzger
Hi,
I'll fix the link in the YARN documentation. Thank you for reporting the
issue.

I'm not aware of any discussions or implementations related to the
scheduling. From my experience working with users and also from the mailing
list, I don't think that such features are very important.
Since streaming jobs usually run permanently, there is no need to queue
jobs somehow.
For batch jobs, YARN is taking care of the resource allocation (in practice
this means that the job has to wait until the required resources are
available).

There are some discussions (and user requests) regarding resource
elasticity going on and I think we'll add features for dynamically changing
the size of a Flink cluster on YARN while a job is running.

Which features are you missing wrt to scheduling in Flink? Please let me
know if there is anything blocking you from using Flink in production and
we'll see what we can do.

Regards,
Robert



On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi,
>
> The link to FAQ (
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html) is
> on the yarn setup 0.10 documentation page (
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html)
> described in this sentence: *If you have troubles using the Flink YARN
> client, have a look in the FAQ section
> .*
>
> Is the scheduling features considered for next releases?
>
> Thank you.
> Best regards,
> Ovidiu
>
> On 20 Nov 2015, at 11:59, Robert Metzger  wrote:
>
> Hi Ovidiu,
>
> you can submit multiple programs to a running Flink cluster (or a YARN
> session). Flink does currently not have any queuing mechanism.
> The JobManager will reject a program if there are not enough free
> resources for it. If there are enough resources for multiple programs,
> they'll run concurrently.
> Note that Flink is not starting separate JVMs for the programs, so if one
> program is doing a System.exit(0), it is killing the entire JVM,
> including other running programs.
>
> You can start as many YARN sessions (or single jobs to YARN) as you have
> resources available on the cluster. The resource allocation is up to the
> scheduler you've configured in YARN.
>
> In general, we recommend to start a YARN session per program. You can also
> directly submit a Flink program to YARN.
>
> Where did you find the link to the FAQ? The link on the front page is
> working: http://flink.apache.org/faq.html
>
>
>
> On Fri, Nov 20, 2015 at 11:41 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Hi,
>>
>> I am currently interested in experimenting on Flink over Hadoop YARN.
>> I am documenting from the documentation we have here:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>>
>> There is a subsection *Start Flink Session* which states the following: *A
>> session will start all required Flink services (JobManager and
>> TaskManagers) so that you can submit programs to the cluster. Note that you
>> can run multiple programs per session.*
>>
>> Can you be more precise regarding the multiple programs per session? If I
>> submit multiple programs concurently what will happen (can I?)? Maybe they
>> will run in a FIFO fashion or what should I expect?
>>
>> The internals section specify that users can execute multiple Flink Yarn
>> sessions in parallel. This is great, this invites to static partitioning of
>> resources in order to run multiple applications concurrently. Do you
>> support a fair scheduler similar to what Spark claims it has?
>>
>> There is FAQ section (
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html)
>> resource that is missing, can this be updated?
>>
>> Thank you.
>>
>> Best regards,
>> Ovidiu
>>
>>
>
>
>


Re: Compiler Exception

2015-11-20 Thread Truong Duc Kien
Hi Jill,

Thank you very much. Looking forward to trying the fix.

Best,
Kien

On Fri, Nov 20, 2015 at 12:38 PM, Till Rohrmann 
wrote:

> Hi Kien Truong,
>
> I found a solution to your problem. It's actually a bug in Flink's
> optimizer. Thanks for spotting it :-)
>
> I've opened a pull request to fix it (
> https://github.com/apache/flink/pull/1388). The fix will also be included
> in the upcoming `0.10.1` release. After the pull request has been merged
> you can try it out by either checking the current master out and building
> Flink yourself or wait until the SNAPSHOT binaries have been updated
> (usually over night).
>
> Cheers,
> Till
>
> On Thu, Nov 19, 2015 at 2:05 PM, Truong Duc Kien 
> wrote:
>
>> Hi Till,
>> I have narrowed down a minimal test case, you will need flink-gelly-scala
>> package to run this.
>>
>> import org.apache.flink.api.common.functions.MapFunctionimport 
>> org.apache.flink.api.scala._import org.apache.flink.graph._import 
>> org.apache.flink.graph.scala.Graphimport 
>> org.apache.flink.types.NullValueimport org.apache.flink.util.Collectorobject 
>> BulkIterationBug {
>>   def main(args: Array[String]): Unit = {
>> val environment = ExecutionEnvironment.getExecutionEnvironmentval g 
>> = Graph.fromCsvReader[Long, Long, NullValue](
>>   pathEdges = "edge.in",
>>   vertexValueInitializer = new MapFunction[Long, Long] {
>> override def map(t: Long): Long = t  },
>>   fieldDelimiterEdges = " ",
>>   lineDelimiterEdges = "\n",
>>   ignoreCommentsEdges = "%",
>>   env = environment
>> )
>> val vertices = g.getVerticesval edges = g.getEdgesval data = 
>> vertices.iterate(1) {
>>   (it) => {
>> it.coGroup(edges).where(0).equalTo(0) {
>>   (first: Iterator[Vertex[Long, Long]],
>>second: Iterator[Edge[Long, NullValue]],
>>collector: Collector[Vertex[Long, Long]]) => {
>> if (first.hasNext) {
>>   collector.collect(first.next)
>> }
>>   }
>> }
>>   }
>> }
>> println(data.collect())
>>   }
>> }
>>
>> The input file "edge.in" contains only 1 line
>>
>> 1 2
>>
>>
>> Thanks,
>> Kien Truong
>>
>>
>>
>> On 11/19/2015 09:36 AM, Till Rohrmann wrote:
>>
>> Hi Kien Truong,
>>
>> could you share the problematic code with us?
>>
>> Cheers,
>> Till
>> On Nov 18, 2015 9:54 PM, "Truong Duc Kien" 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm hitting Compiler Exception with some of my data set, but not all of
>>> them.
>>>
>>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>>> No plan meeting the requirements could be created @ Bulk Iteration (Bulk
>>> Iteration) (1:null). Most likely reason: Too restrictive plan hints.
>>>
>>> Can I have some hints on how to troubleshoot this ?
>>>
>>> Thanks,
>>> Kien Truong
>>>
>>>
>>
>


Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Thank you, Robert!

My research interest includes Flink (I am a PhD student, BigStorage EU project, 
Inria Rennes) so I am currently preparing some experiments in order to 
understand better how it works.

If I got it right:
-with standalone (cluster) you can run multiple workloads if you have enough 
resources, else the job will be rejected.
-with a yarn session, yarn will accept the job but will only execute it when 
there are enough resources.

My point on scheduling: 
If I have an installation (Flink over Yarn for example) and in my cluster I 
have enough resources to serve multiple requests.
Some jobs are running permanently, some are not. I want to be able to schedule 
jobs concurrently. My options right now, if I understand correctly, is to 
either wait for the current job to finish (assuming it has acquired all the 
available resources) or to stop the current job, in case I have other jobs with 
higher priorities. This could be related also to the resource elasticity you 
mentioned.

Best regards,
Ovidiu

> On 20 Nov 2015, at 13:34, Robert Metzger  wrote:
> 
> Hi,
> I'll fix the link in the YARN documentation. Thank you for reporting the 
> issue.
> 
> I'm not aware of any discussions or implementations related to the 
> scheduling. From my experience working with users and also from the mailing 
> list, I don't think that such features are very important.
> Since streaming jobs usually run permanently, there is no need to queue jobs 
> somehow.
> For batch jobs, YARN is taking care of the resource allocation (in practice 
> this means that the job has to wait until the required resources are 
> available).
> 
> There are some discussions (and user requests) regarding resource elasticity 
> going on and I think we'll add features for dynamically changing the size of 
> a Flink cluster on YARN while a job is running.
> 
> Which features are you missing wrt to scheduling in Flink? Please let me know 
> if there is anything blocking you from using Flink in production and we'll 
> see what we can do.
> 
> Regards,
> Robert
> 
> 
> 
> On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> The link to FAQ 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
> ) is 
> on the yarn setup 0.10 documentation page 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>  
> )
>  described in this sentence: If you have troubles using the Flink YARN 
> client, have a look in the FAQ section 
> .
> 
> Is the scheduling features considered for next releases?
> 
> Thank you.
> Best regards,
> Ovidiu
> 
>> On 20 Nov 2015, at 11:59, Robert Metzger > > wrote:
>> 
>> Hi Ovidiu,
>> 
>> you can submit multiple programs to a running Flink cluster (or a YARN 
>> session). Flink does currently not have any queuing mechanism.
>> The JobManager will reject a program if there are not enough free resources 
>> for it. If there are enough resources for multiple programs, they'll run 
>> concurrently.
>> Note that Flink is not starting separate JVMs for the programs, so if one 
>> program is doing a System.exit(0), it is killing the entire JVM, including 
>> other running programs.
>> 
>> You can start as many YARN sessions (or single jobs to YARN) as you have 
>> resources available on the cluster. The resource allocation is up to the 
>> scheduler you've configured in YARN.
>> 
>> In general, we recommend to start a YARN session per program. You can also 
>> directly submit a Flink program to YARN.
>> 
>> Where did you find the link to the FAQ? The link on the front page is 
>> working: http://flink.apache.org/faq.html 
>> 
>> 
>> 
>> On Fri, Nov 20, 2015 at 11:41 AM, Ovidiu-Cristian MARCU 
>> mailto:ovidiu-cristian.ma...@inria.fr>> 
>> wrote:
>> Hi,
>> 
>> I am currently interested in experimenting on Flink over Hadoop YARN.
>> I am documenting from the documentation we have here: 
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>>  
>> 
>> 
>> There is a subsection Start Flink Session which states the following: A 
>> session will start all required Flink services (JobManager and TaskManagers) 
>> so that you can submit programs to the cluster. Note that you can run 
>> multiple programs per session.
>> 
>> Can you be more precise regarding the multiple programs per session? If I 
>> submit multiple programs concurently what will happen (can I?)? Maybe they 
>> will run in a FIFO fashion or what should I expect?
>> 
>> The internals section specify that users can execute multiple Flink Yarn 
>> sessions in p

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Robert Metzger
Hi Ovidiu,

good choice on your research topic ;)

I think doing some hands on experiments will help you to understand much
better how Flink works and what you can do with it.

If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have
> enough resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it
> when there are enough resources.


That's not right. The YARN session and standalone cluster mode are
basically the same.
Both the YARN session and the cluster mode will run job in parallel if
there are not enough resources and they both will reject jobs if not enough
resources are there.


My point on *scheduling*:
> If I have an installation (Flink over Yarn for example) and in my cluster
> I have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to
> schedule jobs concurrently. My options right now, if I understand
> correctly, is to either wait for the current job to finish (assuming it has
> acquired all the available resources) or to stop the current job, in case I
> have other jobs with higher priorities. This could be related also to the
> resource elasticity you mentioned.


Yes, resource elasticity in Flink will mitigate such issues. We would be
able to respond to YARN's preemption requests if jobs with higher
priorities are requesting additional resources.

On Fri, Nov 20, 2015 at 2:07 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Thank you, Robert!
>
> My research interest includes Flink (I am a PhD student, BigStorage EU
> project, Inria Rennes) so I am currently preparing some experiments in
> order to understand better how it works.
>
> If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have
> enough resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it
> when there are enough resources.
>
> My point on *scheduling*:
> If I have an installation (Flink over Yarn for example) and in my cluster
> I have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to
> schedule jobs concurrently. My options right now, if I understand
> correctly, is to either wait for the current job to finish (assuming it has
> acquired all the available resources) or to stop the current job, in case I
> have other jobs with higher priorities. This could be related also to the
> resource elasticity you mentioned.
>
> Best regards,
> Ovidiu
>
> On 20 Nov 2015, at 13:34, Robert Metzger  wrote:
>
> Hi,
> I'll fix the link in the YARN documentation. Thank you for reporting the
> issue.
>
> I'm not aware of any discussions or implementations related to the
> scheduling. From my experience working with users and also from the mailing
> list, I don't think that such features are very important.
> Since streaming jobs usually run permanently, there is no need to queue
> jobs somehow.
> For batch jobs, YARN is taking care of the resource allocation (in
> practice this means that the job has to wait until the required resources
> are available).
>
> There are some discussions (and user requests) regarding resource
> elasticity going on and I think we'll add features for dynamically changing
> the size of a Flink cluster on YARN while a job is running.
>
> Which features are you missing wrt to scheduling in Flink? Please let me
> know if there is anything blocking you from using Flink in production and
> we'll see what we can do.
>
> Regards,
> Robert
>
>
>
> On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Hi,
>>
>> The link to FAQ (
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html) is
>> on the yarn setup 0.10 documentation page (
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html)
>> described in this sentence: *If you have troubles using the Flink YARN
>> client, have a look in the FAQ section
>> .*
>>
>> Is the scheduling features considered for next releases?
>>
>> Thank you.
>> Best regards,
>> Ovidiu
>>
>> On 20 Nov 2015, at 11:59, Robert Metzger  wrote:
>>
>> Hi Ovidiu,
>>
>> you can submit multiple programs to a running Flink cluster (or a YARN
>> session). Flink does currently not have any queuing mechanism.
>> The JobManager will reject a program if there are not enough free
>> resources for it. If there are enough resources for multiple programs,
>> they'll run concurrently.
>> Note that Flink is not starting separate JVMs for the programs, so if
>> one program is doing a System.exit(0), it is killing the entire JVM,
>> including other running programs.
>>
>> You can start as many YARN sessions (or single jobs to YARN) as you have
>> resources available on the cluster. The resource allocation is up to the
>>

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Ovidiu-Cristian MARCU
Hi Robert,

In this case, if both the standalone and yarn modes will run jobs if they have 
resources, is it better to rely on which one?
I would be interested in a feature like the dynamic resource allocation with a 
fair scheduler that Spark has implemented.
If you guys will consider this feature, I will be glad to join as a contributor 
also.

Best regards,
Ovidiu
 

> On 20 Nov 2015, at 14:53, Robert Metzger  wrote:
> 
> Hi Ovidiu,
> 
> good choice on your research topic ;)
> 
> I think doing some hands on experiments will help you to understand much 
> better how Flink works and what you can do with it.
> 
> If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have enough 
> resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it when 
> there are enough resources.
> 
> That's not right. The YARN session and standalone cluster mode are basically 
> the same.
> Both the YARN session and the cluster mode will run job in parallel if there 
> are not enough resources and they both will reject jobs if not enough 
> resources are there.
> 
> 
> My point on scheduling: 
> If I have an installation (Flink over Yarn for example) and in my cluster I 
> have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to 
> schedule jobs concurrently. My options right now, if I understand correctly, 
> is to either wait for the current job to finish (assuming it has acquired all 
> the available resources) or to stop the current job, in case I have other 
> jobs with higher priorities. This could be related also to the resource 
> elasticity you mentioned.
> 
> Yes, resource elasticity in Flink will mitigate such issues. We would be able 
> to respond to YARN's preemption requests if jobs with higher priorities are 
> requesting additional resources. 
> 
> On Fri, Nov 20, 2015 at 2:07 PM, Ovidiu-Cristian MARCU 
> mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Thank you, Robert!
> 
> My research interest includes Flink (I am a PhD student, BigStorage EU 
> project, Inria Rennes) so I am currently preparing some experiments in order 
> to understand better how it works.
> 
> If I got it right:
> -with standalone (cluster) you can run multiple workloads if you have enough 
> resources, else the job will be rejected.
> -with a yarn session, yarn will accept the job but will only execute it when 
> there are enough resources.
> 
> My point on scheduling: 
> If I have an installation (Flink over Yarn for example) and in my cluster I 
> have enough resources to serve multiple requests.
> Some jobs are running permanently, some are not. I want to be able to 
> schedule jobs concurrently. My options right now, if I understand correctly, 
> is to either wait for the current job to finish (assuming it has acquired all 
> the available resources) or to stop the current job, in case I have other 
> jobs with higher priorities. This could be related also to the resource 
> elasticity you mentioned.
> 
> Best regards,
> Ovidiu
> 
>> On 20 Nov 2015, at 13:34, Robert Metzger > > wrote:
>> 
>> Hi,
>> I'll fix the link in the YARN documentation. Thank you for reporting the 
>> issue.
>> 
>> I'm not aware of any discussions or implementations related to the 
>> scheduling. From my experience working with users and also from the mailing 
>> list, I don't think that such features are very important.
>> Since streaming jobs usually run permanently, there is no need to queue jobs 
>> somehow.
>> For batch jobs, YARN is taking care of the resource allocation (in practice 
>> this means that the job has to wait until the required resources are 
>> available).
>> 
>> There are some discussions (and user requests) regarding resource elasticity 
>> going on and I think we'll add features for dynamically changing the size of 
>> a Flink cluster on YARN while a job is running.
>> 
>> Which features are you missing wrt to scheduling in Flink? Please let me 
>> know if there is anything blocking you from using Flink in production and 
>> we'll see what we can do.
>> 
>> Regards,
>> Robert
>> 
>> 
>> 
>> On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU 
>> mailto:ovidiu-cristian.ma...@inria.fr>> 
>> wrote:
>> Hi,
>> 
>> The link to FAQ 
>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html 
>> ) is 
>> on the yarn setup 0.10 documentation page 
>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/yarn_setup.html
>>  
>> )
>>  described in this sentence: If you have troubles using the Flink YARN 
>> client, have a look in the FAQ section 
>> .
>> 
>> Is the scheduling features considered for next releases?

Re: Apache Flink on Hadoop YARN using a YARN Session

2015-11-20 Thread Robert Metzger
Hi,

most users don't have that choice, they have to use Flink on YARN.
Both modes have their advantages and disadvantages, but the decision is up
to you.
You can use a little bit more of your memory using the standalone mode, but
you'll have to install Flink manually on all machines.

Regarding the resource elasticity implementation, just follow the dev@ list
and our JIRAs and jump on the discussions as soon as they happen ;)


On Fri, Nov 20, 2015 at 3:15 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi Robert,
>
> In this case, if both the standalone and yarn modes will run jobs if they
> have resources, is it better to rely on which one?
> I would be interested in a feature like the dynamic resource allocation
> with a fair scheduler that Spark has implemented.
> If you guys will consider this feature, I will be glad to join as a
> contributor also.
>
> Best regards,
> Ovidiu
>
>
> On 20 Nov 2015, at 14:53, Robert Metzger  wrote:
>
> Hi Ovidiu,
>
> good choice on your research topic ;)
>
> I think doing some hands on experiments will help you to understand much
> better how Flink works and what you can do with it.
>
> If I got it right:
>> -with standalone (cluster) you can run multiple workloads if you have
>> enough resources, else the job will be rejected.
>> -with a yarn session, yarn will accept the job but will only execute it
>> when there are enough resources.
>
>
> That's not right. The YARN session and standalone cluster mode are
> basically the same.
> Both the YARN session and the cluster mode will run job in parallel if
> there are not enough resources and they both will reject jobs if not enough
> resources are there.
>
>
> My point on *scheduling*:
>> If I have an installation (Flink over Yarn for example) and in my cluster
>> I have enough resources to serve multiple requests.
>> Some jobs are running permanently, some are not. I want to be able to
>> schedule jobs concurrently. My options right now, if I understand
>> correctly, is to either wait for the current job to finish (assuming it has
>> acquired all the available resources) or to stop the current job, in case I
>> have other jobs with higher priorities. This could be related also to the
>> resource elasticity you mentioned.
>
>
> Yes, resource elasticity in Flink will mitigate such issues. We would be
> able to respond to YARN's preemption requests if jobs with higher
> priorities are requesting additional resources.
>
> On Fri, Nov 20, 2015 at 2:07 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Thank you, Robert!
>>
>> My research interest includes Flink (I am a PhD student, BigStorage EU
>> project, Inria Rennes) so I am currently preparing some experiments in
>> order to understand better how it works.
>>
>> If I got it right:
>> -with standalone (cluster) you can run multiple workloads if you have
>> enough resources, else the job will be rejected.
>> -with a yarn session, yarn will accept the job but will only execute it
>> when there are enough resources.
>>
>> My point on *scheduling*:
>> If I have an installation (Flink over Yarn for example) and in my cluster
>> I have enough resources to serve multiple requests.
>> Some jobs are running permanently, some are not. I want to be able to
>> schedule jobs concurrently. My options right now, if I understand
>> correctly, is to either wait for the current job to finish (assuming it has
>> acquired all the available resources) or to stop the current job, in case I
>> have other jobs with higher priorities. This could be related also to the
>> resource elasticity you mentioned.
>>
>> Best regards,
>> Ovidiu
>>
>> On 20 Nov 2015, at 13:34, Robert Metzger  wrote:
>>
>> Hi,
>> I'll fix the link in the YARN documentation. Thank you for reporting the
>> issue.
>>
>> I'm not aware of any discussions or implementations related to the
>> scheduling. From my experience working with users and also from the mailing
>> list, I don't think that such features are very important.
>> Since streaming jobs usually run permanently, there is no need to queue
>> jobs somehow.
>> For batch jobs, YARN is taking care of the resource allocation (in
>> practice this means that the job has to wait until the required resources
>> are available).
>>
>> There are some discussions (and user requests) regarding resource
>> elasticity going on and I think we'll add features for dynamically changing
>> the size of a Flink cluster on YARN while a job is running.
>>
>> Which features are you missing wrt to scheduling in Flink? Please let me
>> know if there is anything blocking you from using Flink in production and
>> we'll see what we can do.
>>
>> Regards,
>> Robert
>>
>>
>>
>> On Fri, Nov 20, 2015 at 1:24 PM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>>
>>> Hi,
>>>
>>> The link to FAQ (
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/faq.html) is
>>> on the yarn setup 0.10 documentation page (
>>> https://ci.apache.o

placement preferences for streaming jobs

2015-11-20 Thread Stefania Costache
Hi,

I have started using Flink and I am wondering if it is possible to specify 
placement preferences for the streaming jobs. More precisely, if I run Flink in 
stand-alone mode on a cluster and I submit a streaming job to it, is there a 
way to ask for the job or for some of its tasks to run on specific nodes?

Thank you in advance,
Stefania

Re: Published test artifacts for flink streaming

2015-11-20 Thread Nick Dimiduk
Very interesting Alex!

One other thing I find useful in building data flows is using "builder"
functions that hide the details of wiring up specific plumbing on generic
input parameters. For instance a void wireFoo(DataSource source,
SinkFunction sink) { ... }. It would be great to have test tools that allow
working with this kind of composition. For instance, I find invoking
setParallelism on the result of registering a SinkFunction on a flow is not
very convenient with the current APIs. In this case, I would want to set
the parallelism to something different in test than I'd like in production.

Any thoughts on how we might make testing compositions like this easier?

-n

On Thu, Nov 19, 2015 at 2:09 AM, lofifnc  wrote:

> Hi,
>
> I'm currently working on improving the testing process of flink streaming
> applications.
>
> I have written a test runtime that takes care of execution, collecting the
> output and applying a verifier to it. The runtime is able to provide test
> sources and sinks that run in parallel.
> On top of that it offers an API for defining JUnit tests. I have recreated
> your test using my API:
> https://gist.github.com/anonymous/8825ea83e8fe9afba19e
> I have a second example that shows some of the more advanced features of
> the
> API and runtime:
> https://gist.github.com/anonymous/fa5c77becae2e37d28eb
>
> Unfortunately It's still a work in progress. But I'm planning to have a
> first release candidate at the end of the month.
>
> Best, Alex
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Published-test-artifacts-for-flink-streaming-tp3379p3584.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>