Re: YARN High Availability

2016-04-05 Thread Ufuk Celebi
Hey Konstantin,

just looked at the logs and the cluster is started, but the job is
indeed never submitted.

I've forwarded this to Robert, because he is familiar with the YARN
client. I will look into how the client interacts with the ZooKeeper
root path.

– Ufuk


On Tue, Apr 5, 2016 at 9:18 AM, Konstantin Knauf
 wrote:
> Hi Ufuk, Hi Stephan,
>
> sorry for the late response Attached the client logs.
>
> Cheers,
>
> Konstantin
>
> On 04.04.2016 21:20, Stephan Ewen wrote:
>> This seems to the the critical part in the logs:
>>
>> 2016-03-31 09:01:52,234 INFO  org.apache.flink.yarn.YarnJobManager
>>- Re-submitting 0 job graphs.
>> 2016-03-31 09:02:51,182 INFO  org.apache.flink.yarn.YarnJobManager
>>- Stopping YARN JobManager with status FAILED and
>> diagnostic Flink YARN Client requested shutdown.
>>
>> The YarnJobManager starts up properly, but the Client never sends
>> anything, shuts down at some point, and tears down the YARN cluster.
>>
>> Client logs would help a lot there...
>>
>>
>>
>>
>> On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi > > wrote:
>>
>> Hey Konstantin,
>>
>> That's weird. Can you please log the client output on DEBUG level and
>> provide that as well? I'm wondering whether the client uses a
>> different root path.
>>
>> The following seems to happen:
>> - you use ledf_recovery as the root namespace
>> - the task managers are connecting (they resolve the JM address via
>> ZooKeeper in this case as well, which means they correctly use the
>> same namespace)
>> - but the client, which started the YARN session, does not ever submit
>> the job to the cluster.
>>
>> – Ufuk
>>
>> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
>> mailto:konstantin.kn...@tngtech.com>>
>> wrote:
>> > Hi everyone,
>> >
>> > we are running in some problems with multiple per-job yarn
>> sessions, too.
>> >
>> > When we are are starting a per-job yarn session (Flink 1.0, Hadoop
>> 2.4)
>> > with recovery.zookeeper.path.root other than /flink, the yarn session
>> > starts but no job is submitted, and after 1 min or so the session
>> > crashes. I attached the jobmanager log.
>> >
>> > In Zookeeper the root-directory is created and child-nodes
>> >
>> > leaderlatch
>> > jobgraphs
>> >
>> > /flink does also exist, but does not have child nodes.
>> >
>> > Everything runs fine, with the default recovery.zookeeper.root.path.
>> >
>> > Does anyone have an idea, what is going on?
>> >
>> > Cheers,
>> >
>> > Konstnatin
>> >
>> >
>> > On 23.11.2015 17:00, Gwenhael Pasquiers wrote:
>> >> We are not yet using HA in our cluster instances.
>> >>
>> >> But yes, we will have to change the zookeeper.path.root J
>> >>
>> >>
>> >>
>> >> We package our jobs with their own config folder (we don’t rely on
>> >> flink’s config folder); we can put the maven project name into this
>> >> property then they will have different values J
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *From:*Till Rohrmann [mailto:trohrm...@apache.org
>> ]
>> >> *Sent:* lundi 23 novembre 2015 14:51
>> >> *To:* user@flink.apache.org 
>> >> *Subject:* Re: YARN High Availability
>> >>
>> >>
>> >>
>> >> The problem is the execution graph handle which is stored in
>> ZooKeeper.
>> >> You can manually remove it via the ZooKeeper shell by simply deleting
>> >> everything below your `recovery.zookeeper.path.root` ZNode. But you
>> >> should be sure that the cluster has been stopped before.
>> >>
>> >>
>> >>
>> >> Do you start the different clusters with different
>> >> `recovery.zookeeper.path.root` values? If not, then you should
>> run into
>> >> troubles when running multiple clusters at the same time. The
>> reason is
>> >> that then all clusters will think that they belong together.
>> >>
>> >>
>> >>
>> >> Cheers,
>> >>
>> >> Till
>> >>
>> >>
>> >>
>> >> On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers
>> >> > 
>> >> > >> wrote:
>> >>
>> >> OK, I understand.
>> >>
>> >> Maybe we are not really using flink as you intended. The way we are
>> >> using it, one cluster equals one job. That way we are sure to isolate
>> >> the different jobs as much as possible and in case of crashes /
>> bugs /
>> >> (etc) can completely kill one cluster without interfering with
>> the other
>> >> jobs.
>> >>
>> >> That future behavior seems good :-)
>> >>
>> >> Instead of the manual flink commands, is there to manually delete
>> t

Re: building for Scala 2.11

2016-04-05 Thread Andrew Gaydenko
Chiwan, thanks, got it! - and the build finished with success.

I still a little confused with the method used: a tool from tools/
changes files being under the Git control.


Regards,
Andrew


Chiwan Park  writes:

> Hi Andrew,
>
> The method to build Flink with Scala 2.11 is described in Flink documentation 
> [1].
>
> I think this is not relevant but just FYI, to build your application based on 
> Flink 1.0 (or later) with Scala 2.11, you should be careful to set Flink 
> dependencies. There is a guide in Wiki [2].
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#scala-versions
> [2]: 
> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
>
> Regards,
> Chiwan Park
>
>> On Apr 5, 2016, at 9:40 AM, Andrew Gaydenko  
>> wrote:
>> 
>> Hi!
>> 
>> How to build the project for Scala 2.11?
>> -- 
>> 
>> Regards,
>> Andrew


Re: building for Scala 2.11

2016-04-05 Thread Andrew Gaydenko
Balaji, now I see it is my mistake: I wasn't clear enough in my
question, sorry. Saying "the project" I mean Flink project itself. The
question is already answered.


Regards,
Andrew

Balaji Rajagopalan  writes:

> In your pom file you can mention against which version of scala you want to
> build and also remember to add the scala version in the artifactId in all
> the dependencies which takes scala version, there might be some libraries
> which are scala agnostic there you do not have to specify the scala
> version.
>
> 
>
>
> UTF-8
>
> 1.7.12
>
> 1.0.0
>
> 2.11
>
> 
>
>
> 
>
>
> 
>
> org.apache.flink
>
>
> flink-streaming-java_${scala.version}
>
> ${flink.version}
>
> 
>
>
> 
>
> org.apache.flink
>
> flink-streaming-scala_${scala.version}
> 
>
> ${flink.version}
>
> 
>
>
>
> On Tue, Apr 5, 2016 at 6:10 AM, Andrew Gaydenko 
> wrote:
>
>> Hi!
>>
>> How to build the project for Scala 2.11?
>> --
>>
>> Regards,
>> Andrew
>>



Re: FYI: Updated Slides Section

2016-04-05 Thread Theodore Vasiloudis
Hello all,

you can find my slides on Large-Scale Machine Learning with FlinkML here
(from SICS Data Science day and FOSDEM 2016):
http://www.slideshare.net/TheodorosVasiloudis/flinkml-large-scale-machine-learning-with-apache-flink

Best,
Theodore

On Mon, Apr 4, 2016 at 3:19 PM, Rubén Casado 
wrote:

> Dear community,
>
> Just in case it is useful, please find below the links to the slides from
> the 1st Flink Madrid Meetup talks given by Fabien Hueske [1] and myself [2]
> (in spanish).
>
> Best
>
> [1]
> http://www.slideshare.net/fhueske/data-stream-processing-with-apache-flink
> [2]
> http://es.slideshare.net/Datadopter/dnde-encaja-apache-flink-en-el-ecosistema-actual-de-tecnologas-big-data
> __
>
> *Dr. Rubén Casado*
> Head of Big Data
> Treelogic
>  
> *ruben.casado.treelogic*
>
> +34 902 286 386 - +34 607 18 28 06
> Parque Tecnológico de Asturias · Parcela 30
> E33428 Llanera · Asturias [Spain]
> www.treelogic.com
> __
>
>
> - Mensaje original -
> De: "Ufuk Celebi" 
> Para: user@flink.apache.org
> CC: d...@flink.apache.org
> Enviados: Lunes, 4 de Abril 2016 11:33:59 GMT +01:00 Amsterdam / Berlín /
> Berna / Roma / Estocolmo / Viena
> Asunto: FYI: Updated Slides Section
>
> Dear Flink community,
>
> I have updated the Material section on the Flink project page and
> moved the slides section to a separate page.
>
> You can find links to slides and talks here now:
> http://flink.apache.org/slides.html
>
> I've added slides for talks from this year by Till Rohrmann, Vasia
> Kalavri, Robert Metzger, Jamie Girer and Kostas Tzoumas. If you think
> that something is missing, feel free to ping in this thread.
>
> – Ufuk
>


Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Ufuk Celebi
Hey Timur,

which EMR version are you using?

– Ufuk

On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov  wrote:
> Thanks for the answer, Ken.
>
> My understanding is that file system selection is driven by the following
> sections in core-site.xml:
> 
>   fs.s3.impl
>
>   org.apache.hadoop.fs.s3native.NativeS3FileSystem
> 
>
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>
> If I run the program using configuration above with s3n (and also modifying
> credential keys to use s3n) it fails with the same error, but there is no
> "... opening key ..." logs. S3a seems to be not supported, it fails with the
> following:
> Caused by: java.io.IOException: No file system found with scheme s3a,
> referenced in file URI 's3a://'.
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
> ... 23 more
>
> I am puzzled by the fact that EMRFS is still apparently referenced somewhere
> as an implementation for S3 protocol, I'm not able to locate where this
> configuration is set.
>
>
> On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler 
> wrote:
>>
>> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>>
>> Though EMR has some support for magically treating the s3 protocol as s3n
>> (or maybe s3a now, with Hadoop 2.6 or later)
>>
>> What happens if you use s3n:/// for the --input
>> parameter?
>>
>> — Ken
>>
>> On Apr 4, 2016, at 2:51pm, Timur Fayruzov 
>> wrote:
>>
>> Hello,
>>
>> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
>> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
>> Job, unload outputs from HDFS to S3.
>>
>> However, ideally I'd like to read/write data directly from/to S3. I
>> followed the instructions here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> more specifically I:
>>   1. Modified flink-conf to point to /etc/hadoop/conf
>>   2. Modified core-site.xml per link above (not clear why why it is not
>> using IAM, I had to provide AWS keys explicitly).
>>
>> Run the following command:
>> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
>> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
>> s3:// --output hdfs:///flink-output
>>
>> First, I see messages like that:
>> 2016-04-04 21:37:10,418 INFO
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key
>> '' for reading at position '333000'
>>
>> Then, it fails with the following error:
>>
>> 
>>
>>  The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> (WordCount Example)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>
>> at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:606)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
>> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)
>>
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunctio

Re: YARN High Availability

2016-04-05 Thread Robert Metzger
I've tried reproducing the issue on a test cluster, but everything worked
fine.

Have you tried different values for "recovery.zookeeper.path.root" or only
one? Maybe the path you've put contains invalid data?

Regarding the client log you've send: Did you manually stop the client or
did it stop after a few minutes?
The JobManager stops after a few minutes because the client requested a
shutdown. Usually, the client only shuts down on an exception or when the
user stops the yarn session.
There is no exception in the client log. Was there an exception printed to
the shell?

This log message:

2016-04-05 08:48:34,912 DEBUG org.apache.flink.yarn.FlinkYarnCluster
 - Received message option None

Should not be an issue.


On Tue, Apr 5, 2016 at 10:14 AM, Ufuk Celebi  wrote:

> Hey Konstantin,
>
> just looked at the logs and the cluster is started, but the job is
> indeed never submitted.
>
> I've forwarded this to Robert, because he is familiar with the YARN
> client. I will look into how the client interacts with the ZooKeeper
> root path.
>
> – Ufuk
>
>
> On Tue, Apr 5, 2016 at 9:18 AM, Konstantin Knauf
>  wrote:
> > Hi Ufuk, Hi Stephan,
> >
> > sorry for the late response Attached the client logs.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 04.04.2016 21:20, Stephan Ewen wrote:
> >> This seems to the the critical part in the logs:
> >>
> >> 2016-03-31 09:01:52,234 INFO  org.apache.flink.yarn.YarnJobManager
> >>- Re-submitting 0 job graphs.
> >> 2016-03-31 09:02:51,182 INFO  org.apache.flink.yarn.YarnJobManager
> >>- Stopping YARN JobManager with status FAILED and
> >> diagnostic Flink YARN Client requested shutdown.
> >>
> >> The YarnJobManager starts up properly, but the Client never sends
> >> anything, shuts down at some point, and tears down the YARN cluster.
> >>
> >> Client logs would help a lot there...
> >>
> >>
> >>
> >>
> >> On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi  >> > wrote:
> >>
> >> Hey Konstantin,
> >>
> >> That's weird. Can you please log the client output on DEBUG level
> and
> >> provide that as well? I'm wondering whether the client uses a
> >> different root path.
> >>
> >> The following seems to happen:
> >> - you use ledf_recovery as the root namespace
> >> - the task managers are connecting (they resolve the JM address via
> >> ZooKeeper in this case as well, which means they correctly use the
> >> same namespace)
> >> - but the client, which started the YARN session, does not ever
> submit
> >> the job to the cluster.
> >>
> >> – Ufuk
> >>
> >> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
> >> mailto:konstantin.kn...@tngtech.com
> >>
> >> wrote:
> >> > Hi everyone,
> >> >
> >> > we are running in some problems with multiple per-job yarn
> >> sessions, too.
> >> >
> >> > When we are are starting a per-job yarn session (Flink 1.0, Hadoop
> >> 2.4)
> >> > with recovery.zookeeper.path.root other than /flink, the yarn
> session
> >> > starts but no job is submitted, and after 1 min or so the session
> >> > crashes. I attached the jobmanager log.
> >> >
> >> > In Zookeeper the root-directory is created and child-nodes
> >> >
> >> > leaderlatch
> >> > jobgraphs
> >> >
> >> > /flink does also exist, but does not have child nodes.
> >> >
> >> > Everything runs fine, with the default
> recovery.zookeeper.root.path.
> >> >
> >> > Does anyone have an idea, what is going on?
> >> >
> >> > Cheers,
> >> >
> >> > Konstnatin
> >> >
> >> >
> >> > On 23.11.2015 17:00, Gwenhael Pasquiers wrote:
> >> >> We are not yet using HA in our cluster instances.
> >> >>
> >> >> But yes, we will have to change the zookeeper.path.root J
> >> >>
> >> >>
> >> >>
> >> >> We package our jobs with their own config folder (we don’t rely
> on
> >> >> flink’s config folder); we can put the maven project name into
> this
> >> >> property then they will have different values J
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> *From:*Till Rohrmann [mailto:trohrm...@apache.org
> >> ]
> >> >> *Sent:* lundi 23 novembre 2015 14:51
> >> >> *To:* user@flink.apache.org 
> >> >> *Subject:* Re: YARN High Availability
> >> >>
> >> >>
> >> >>
> >> >> The problem is the execution graph handle which is stored in
> >> ZooKeeper.
> >> >> You can manually remove it via the ZooKeeper shell by simply
> deleting
> >> >> everything below your `recovery.zookeeper.path.root` ZNode. But
> you
> >> >> should be sure that the cluster has been stopped before.
> >> >>
> >> >>
> >> >>
> >> >> Do you start the different clusters with different
> >> >> `recovery.zookeeper.path.root` values? If not, then you should
> 

Re: CEP API: Question on FollowedBy

2016-04-05 Thread Till Rohrmann
Hi Anwar,

yes, once we have published the introductory blog post about the CEP
library, we will also publish a more in-depth description of the approach
we have implemented. To spoil it a little bit: We have mainly followed the
paper “Efficient Pattern Matching over Event Streams” for the
implementation.

Concerning your questions:

1.) followedBy means that there can be an arbitrary sequence of events
between two matching events, as long as they occur in the specified time
interval. Thus, TemperatureEvent(40) will match together with
TemperaturEvent(50), TemperaturEvent(70), TemperaturEvent(65) and
TemperaturEvent(60).

2.) The same applies here for the next operator. It says that the second
matching event has to follow directly after the previous matched event.
However, a matching event can be part of multiple matching sequences. Thus,
you will get TemperaturEvent(70), TemperaturEvent(65) and
TemperaturEvent(65), TemperaturEvent(60) as two distinct matching sequences.

To make a long story short, there is currently no option to change the
sequence semantics so that events are only part of one matching sequence.
At the moment, events can participate in multiple matching sequences. I
hope that answers your question Anwar.

Cheers,
Till
​

On Mon, Apr 4, 2016 at 11:18 AM, Anwar Rizal  wrote:

> Hi All,
>
>
> I saw Till's blog preparation. It will be a very helpful blog. I hope that
> some other blogs that explain how it works will come soon :-)
>
> I have a question on followedBy pattern matching semantic.
>
>
> From the documentation
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
> ,
>
> 
>
> Non-strict contiguity means that other events are allowed to occur
> in-between two matching events. A non-strict contiguity pattern state can
> be created via the followedBy method.
>
> Pattern nonStrictNext = start.followedBy("middle");
>
>
> 
>
> I try to use Till's examples in the blog, to understand the semantic of
> followedBy
>
> --
> First question.
> Say, I have sequence of temperatures in a time window that corresponds to
> the within clause (say in 20 minutes).
>
> TemperatureEvent(40) , OtherEvent(...), TemperatureEvent(30),
> TemperatureEvent(50), OtherEvent(...), TemperatureEvent(70),
> TemperatureEvent(65), TemperatureEvent(60)
>
> say I want to match two TemperatureEvents whose temperatures > 35.
>
> What will be the matches in this case ?
>
>
>-  Will TemperatureEvent(40) , TemperatureEvent(50), match ? (because
>we have TemperatureEvent(30) at time 3 that does not match.
>- Will TemperatureEvent(40) , TemperatureEvent(70) match ? (because
>the pair matches also the specification of pattern , the difference is we
>have TemperatureEvent(50) which happened before TempertureEvent(70) ).
>Similar question for TemperatureEvent(40) - TemperatureEvent(65) and
>TemperatureEvent(50)-TemperatureEvent(65) etc. pairs.
>
>
>
> --
> Second question.
> For next (and also for followedBy) , I have also questions regarding
> example above:
> Will TemperatureEvent(70), TemperatureEvent(65) and TemperatureEvent(65),
> TemperatureEvent(60) be returned , or the second pair is no longer returned
> because TemperatureEvent(65) has been used in the first pair ?
>
>
>
> Is there a way to define the different sequence semantics  for the two
> questions I asked above ?
>
>
> Thanks,
> Anwar.
>
>
>
>
>
>
>
>


How to test serializability of a Flink job

2016-04-05 Thread Simone Robutti
Hello,

last week I got a problem where my job worked in local mode but could not
be serialized on the cluster. I assume that local mode does not really
serialize all the operators (the problem was with a custom map function)
and I need to enforce this behaviour in local mode or, better, be able to
write tests that verify that a class or a job could be successfully
serialized.

Thanks,

Simone


[DISCUSS] Allowed Lateness in Flink

2016-04-05 Thread Aljoscha Krettek
Hi Folks,
as part of my effort to improve the windowing in Flink [1] I also thought
about lateness, accumulating/discarding and window cleanup. I have some
ideas on this but I would love to get feedback from the community as I
think that these things are important for everyone doing event-time
windowing on Flink.

The basic problem is this: Some elements can arrive behind the watermark if
the watermark is not 100 % correct (which it is not, in most cases, I would
assume). We need to provide API that allows to specify what happens when
these late elements arrive. There are two main knobs for the user here:

- Allowed Lateness: How late can an element be before it is completely
ignored, i.e. simply discarded

- Accumulating/Discarding Fired Windows: When we fire a window, do we purge
the contents or do we keep it around until the watermark passes the end of
end window plus the allowed lateness? If we keep the window a late element
will be added to the window and the window will be emitted again. If don't
keep the window then the late element will essentially trigger emission of
a one-element window.

This is somewhat straightforward to implement: If accumulating set a timer
for the end of the window plus the allowed lateness. Cleanup the window
when that fires (basically). All in event-time with watermarks.

 My problem is only this: what should happen if the user specifies some
allowed lateness and/or accumulating mode but uses processing-time
windowing. For processing-time windows these don't make sense because
elements cannot can be late by definition. The problem is, that we cannot
figure out, by looking at a WindowAssigner or the Windows that it assigns
to elements whether these windows are in event-time or processing-time
domain. At the API level this is also not easily visible, since a user
might have set the "stream-time-characteristic" to event-time but still use
a processing-time window (plus trigger) in the program.

Any ideas for solving this are extremely welcome. :-)

Cheers,
Aljoscha

[1]
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp


Re: [DISCUSS] Allowed Lateness in Flink

2016-04-05 Thread Aljoscha Krettek
By the way. The way I see to fixing this is extending WindowAssigner with
an "isEventTime()" method and then allow accumulating/lateness in the
WindowOperator only if this is true.

But it seems a but hacky because it special cases event-time. But then
again, maybe we need to special case it ...

On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek  wrote:

> Hi Folks,
> as part of my effort to improve the windowing in Flink [1] I also thought
> about lateness, accumulating/discarding and window cleanup. I have some
> ideas on this but I would love to get feedback from the community as I
> think that these things are important for everyone doing event-time
> windowing on Flink.
>
> The basic problem is this: Some elements can arrive behind the watermark
> if the watermark is not 100 % correct (which it is not, in most cases, I
> would assume). We need to provide API that allows to specify what happens
> when these late elements arrive. There are two main knobs for the user here:
>
> - Allowed Lateness: How late can an element be before it is completely
> ignored, i.e. simply discarded
>
> - Accumulating/Discarding Fired Windows: When we fire a window, do we
> purge the contents or do we keep it around until the watermark passes the
> end of end window plus the allowed lateness? If we keep the window a late
> element will be added to the window and the window will be emitted again.
> If don't keep the window then the late element will essentially trigger
> emission of a one-element window.
>
> This is somewhat straightforward to implement: If accumulating set a timer
> for the end of the window plus the allowed lateness. Cleanup the window
> when that fires (basically). All in event-time with watermarks.
>
>  My problem is only this: what should happen if the user specifies some
> allowed lateness and/or accumulating mode but uses processing-time
> windowing. For processing-time windows these don't make sense because
> elements cannot can be late by definition. The problem is, that we cannot
> figure out, by looking at a WindowAssigner or the Windows that it assigns
> to elements whether these windows are in event-time or processing-time
> domain. At the API level this is also not easily visible, since a user
> might have set the "stream-time-characteristic" to event-time but still use
> a processing-time window (plus trigger) in the program.
>
> Any ideas for solving this are extremely welcome. :-)
>
> Cheers,
> Aljoscha
>
> [1]
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>


Re: YARN High Availability

2016-04-05 Thread Konstantin Knauf
Hi Robert,

I tried several paths and rmr before.

It stopped after 1-2 minutes. There was an exception on the shell.
Sorry, should have attached to the last mail.

Thanks,

Konstnatin

On 05.04.2016 11:22, Robert Metzger wrote:
> I've tried reproducing the issue on a test cluster, but everything
> worked fine.
> 
> Have you tried different values for "recovery.zookeeper.path.root" or
> only one? Maybe the path you've put contains invalid data?
> 
> Regarding the client log you've send: Did you manually stop the client
> or did it stop after a few minutes?
> The JobManager stops after a few minutes because the client requested a
> shutdown. Usually, the client only shuts down on an exception or when
> the user stops the yarn session.
> There is no exception in the client log. Was there an exception printed
> to the shell? 
> 
> This log message: 
> 
> 2016-04-05 08:48:34,912 DEBUG org.apache.flink.yarn.FlinkYarnCluster
>- Received message option None
> 
> Should not be an issue.
> 
> 
> On Tue, Apr 5, 2016 at 10:14 AM, Ufuk Celebi  > wrote:
> 
> Hey Konstantin,
> 
> just looked at the logs and the cluster is started, but the job is
> indeed never submitted.
> 
> I've forwarded this to Robert, because he is familiar with the YARN
> client. I will look into how the client interacts with the ZooKeeper
> root path.
> 
> – Ufuk
> 
> 
> On Tue, Apr 5, 2016 at 9:18 AM, Konstantin Knauf
> mailto:konstantin.kn...@tngtech.com>>
> wrote:
> > Hi Ufuk, Hi Stephan,
> >
> > sorry for the late response Attached the client logs.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 04.04.2016 21 :20, Stephan Ewen wrote:
> >> This seems to the the critical part in the logs:
> >>
> >> 2016-03-31 09:01:52,234 INFO  org.apache.flink.yarn.YarnJobManager
> >>- Re-submitting 0 job graphs.
> >> 2016-03-31 09:02:51,182 INFO  org.apache.flink.yarn.YarnJobManager
> >>- Stopping YARN JobManager with status FAILED and
> >> diagnostic Flink YARN Client requested shutdown.
> >>
> >> The YarnJobManager starts up properly, but the Client never sends
> >> anything, shuts down at some point, and tears down the YARN cluster.
> >>
> >> Client logs would help a lot there...
> >>
> >>
> >>
> >>
> >> On Sat, Apr 2, 2016 at 12:43 PM, Ufuk Celebi  
> >> >> wrote:
> >>
> >> Hey Konstantin,
> >>
> >> That's weird. Can you please log the client output on DEBUG
> level and
> >> provide that as well? I'm wondering whether the client uses a
> >> different root path.
> >>
> >> The following seems to happen:
> >> - you use ledf_recovery as the root namespace
> >> - the task managers are connecting (they resolve the JM
> address via
> >> ZooKeeper in this case as well, which means they correctly
> use the
> >> same namespace)
> >> - but the client, which started the YARN session, does not
> ever submit
> >> the job to the cluster.
> >>
> >> – Ufuk
> >>
> >> On Thu, Mar 31, 2016 at 9:23 AM, Konstantin Knauf
> >>  
>  >>
> >> wrote:
> >> > Hi everyone,
> >> >
> >> > we are running in some problems with multiple per-job yarn
> >> sessions, too.
> >> >
> >> > When we are are starting a per-job yarn session (Flink 1.0,
> Hadoop
> >> 2.4)
> >> > with recovery.zookeeper.path.root other than /flink, the
> yarn session
> >> > starts but no job is submitted, and after 1 min or so the
> session
> >> > crashes. I attached the jobmanager log.
> >> >
> >> > In Zookeeper the root-directory is created and child-nodes
> >> >
> >> > leaderlatch
> >> > jobgraphs
> >> >
> >> > /flink does also exist, but does not have child nodes.
> >> >
> >> > Everything runs fine, with the default
> recovery.zookeeper.root.path.
> >> >
> >> > Does anyone have an idea, what is going on?
> >> >
> >> > Cheers,
> >> >
> >> > Konstnatin
> >> >
> >> >
> >> > On 23.11.2015 17:00, Gwenhael Pasquiers wrote:
> >> >> We are not yet using HA in our cluster instances.
> >> >>
> >> >> But yes, we will have to change the zookeeper.path.root J
> >> >>
> >> >>
> >> >>
> >> >> We package our jobs with their own config folder (we don’t
> rely on
> >> >> flink’s config folder); we can put the maven project name
> into this
> >> 

Re: CEP API: Question on FollowedBy

2016-04-05 Thread Anwar Rizal
Thanks Till.

The only way I can change the behavior would be to post filter the result
then.

Anwar.

On Tue, Apr 5, 2016 at 11:41 AM, Till Rohrmann  wrote:

> Hi Anwar,
>
> yes, once we have published the introductory blog post about the CEP
> library, we will also publish a more in-depth description of the approach
> we have implemented. To spoil it a little bit: We have mainly followed the
> paper “Efficient Pattern Matching over Event Streams” for the
> implementation.
>
> Concerning your questions:
>
> 1.) followedBy means that there can be an arbitrary sequence of events
> between two matching events, as long as they occur in the specified time
> interval. Thus, TemperatureEvent(40) will match together with
> TemperaturEvent(50), TemperaturEvent(70), TemperaturEvent(65) and
> TemperaturEvent(60).
>
> 2.) The same applies here for the next operator. It says that the second
> matching event has to follow directly after the previous matched event.
> However, a matching event can be part of multiple matching sequences. Thus,
> you will get TemperaturEvent(70), TemperaturEvent(65) and
> TemperaturEvent(65), TemperaturEvent(60) as two distinct matching sequences.
>
> To make a long story short, there is currently no option to change the
> sequence semantics so that events are only part of one matching sequence.
> At the moment, events can participate in multiple matching sequences. I
> hope that answers your question Anwar.
>
> Cheers,
> Till
> ​
>
> On Mon, Apr 4, 2016 at 11:18 AM, Anwar Rizal  wrote:
>
>> Hi All,
>>
>>
>> I saw Till's blog preparation. It will be a very helpful blog. I hope
>> that some other blogs that explain how it works will come soon :-)
>>
>> I have a question on followedBy pattern matching semantic.
>>
>>
>> From the documentation
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>> ,
>>
>> 
>>
>> Non-strict contiguity means that other events are allowed to occur
>> in-between two matching events. A non-strict contiguity pattern state can
>> be created via the followedBy method.
>>
>> Pattern nonStrictNext = start.followedBy("middle");
>>
>>
>> 
>>
>> I try to use Till's examples in the blog, to understand the semantic of
>> followedBy
>>
>> --
>> First question.
>> Say, I have sequence of temperatures in a time window that corresponds to
>> the within clause (say in 20 minutes).
>>
>> TemperatureEvent(40) , OtherEvent(...), TemperatureEvent(30),
>> TemperatureEvent(50), OtherEvent(...), TemperatureEvent(70),
>> TemperatureEvent(65), TemperatureEvent(60)
>>
>> say I want to match two TemperatureEvents whose temperatures > 35.
>>
>> What will be the matches in this case ?
>>
>>
>>-  Will TemperatureEvent(40) , TemperatureEvent(50), match ? (because
>>we have TemperatureEvent(30) at time 3 that does not match.
>>- Will TemperatureEvent(40) , TemperatureEvent(70) match ? (because
>>the pair matches also the specification of pattern , the difference is we
>>have TemperatureEvent(50) which happened before TempertureEvent(70) ).
>>Similar question for TemperatureEvent(40) - TemperatureEvent(65) and
>>TemperatureEvent(50)-TemperatureEvent(65) etc. pairs.
>>
>>
>>
>> --
>> Second question.
>> For next (and also for followedBy) , I have also questions regarding
>> example above:
>> Will TemperatureEvent(70), TemperatureEvent(65) and TemperatureEvent(65),
>> TemperatureEvent(60) be returned , or the second pair is no longer returned
>> because TemperatureEvent(65) has been used in the first pair ?
>>
>>
>>
>> Is there a way to define the different sequence semantics  for the two
>> questions I asked above ?
>>
>>
>> Thanks,
>> Anwar.
>>
>>
>>
>>
>>
>>
>>
>>
>
>


Re: CEP API: Question on FollowedBy

2016-04-05 Thread Till Rohrmann
Yes exactly. This is a feature which we still have to add.

On Tue, Apr 5, 2016 at 1:07 PM, Anwar Rizal  wrote:

> Thanks Till.
>
> The only way I can change the behavior would be to post filter the result
> then.
>
> Anwar.
>
> On Tue, Apr 5, 2016 at 11:41 AM, Till Rohrmann 
> wrote:
>
>> Hi Anwar,
>>
>> yes, once we have published the introductory blog post about the CEP
>> library, we will also publish a more in-depth description of the approach
>> we have implemented. To spoil it a little bit: We have mainly followed the
>> paper “Efficient Pattern Matching over Event Streams” for the
>> implementation.
>>
>> Concerning your questions:
>>
>> 1.) followedBy means that there can be an arbitrary sequence of events
>> between two matching events, as long as they occur in the specified time
>> interval. Thus, TemperatureEvent(40) will match together with
>> TemperaturEvent(50), TemperaturEvent(70), TemperaturEvent(65) and
>> TemperaturEvent(60).
>>
>> 2.) The same applies here for the next operator. It says that the second
>> matching event has to follow directly after the previous matched event.
>> However, a matching event can be part of multiple matching sequences. Thus,
>> you will get TemperaturEvent(70), TemperaturEvent(65) and
>> TemperaturEvent(65), TemperaturEvent(60) as two distinct matching sequences.
>>
>> To make a long story short, there is currently no option to change the
>> sequence semantics so that events are only part of one matching sequence.
>> At the moment, events can participate in multiple matching sequences. I
>> hope that answers your question Anwar.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, Apr 4, 2016 at 11:18 AM, Anwar Rizal  wrote:
>>
>>> Hi All,
>>>
>>>
>>> I saw Till's blog preparation. It will be a very helpful blog. I hope
>>> that some other blogs that explain how it works will come soon :-)
>>>
>>> I have a question on followedBy pattern matching semantic.
>>>
>>>
>>> From the documentation
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>> ,
>>>
>>> 
>>>
>>> Non-strict contiguity means that other events are allowed to occur
>>> in-between two matching events. A non-strict contiguity pattern state can
>>> be created via the followedBy method.
>>>
>>> Pattern nonStrictNext = start.followedBy("middle");
>>>
>>>
>>> 
>>>
>>> I try to use Till's examples in the blog, to understand the semantic of
>>> followedBy
>>>
>>> --
>>> First question.
>>> Say, I have sequence of temperatures in a time window that corresponds
>>> to the within clause (say in 20 minutes).
>>>
>>> TemperatureEvent(40) , OtherEvent(...), TemperatureEvent(30),
>>> TemperatureEvent(50), OtherEvent(...), TemperatureEvent(70),
>>> TemperatureEvent(65), TemperatureEvent(60)
>>>
>>> say I want to match two TemperatureEvents whose temperatures > 35.
>>>
>>> What will be the matches in this case ?
>>>
>>>
>>>-  Will TemperatureEvent(40) , TemperatureEvent(50), match ?
>>>(because we have TemperatureEvent(30) at time 3 that does not match.
>>>- Will TemperatureEvent(40) , TemperatureEvent(70) match ? (because
>>>the pair matches also the specification of pattern , the difference is we
>>>have TemperatureEvent(50) which happened before TempertureEvent(70) ).
>>>Similar question for TemperatureEvent(40) - TemperatureEvent(65) and
>>>TemperatureEvent(50)-TemperatureEvent(65) etc. pairs.
>>>
>>>
>>>
>>> --
>>> Second question.
>>> For next (and also for followedBy) , I have also questions regarding
>>> example above:
>>> Will TemperatureEvent(70), TemperatureEvent(65) and
>>> TemperatureEvent(65), TemperatureEvent(60) be returned , or the second pair
>>> is no longer returned because TemperatureEvent(65) has been used in the
>>> first pair ?
>>>
>>>
>>>
>>> Is there a way to define the different sequence semantics  for the two
>>> questions I asked above ?
>>>
>>>
>>> Thanks,
>>> Anwar.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Handling large state (incremental snapshot?)

2016-04-05 Thread Hironori Ogibayashi
Hello,

I am trying to implement windowed distinct count on a stream. In this
case, the state
have to hold all distinct value in the window, so can be large.

In my test, if the state size become about 400MB, checkpointing takes
40sec and spends most of Taskmanager's CPU.
Are there any good way to handle this situation?

Flink document mentions about incremental snapshot, and I am interested in it,
but could not find how to enable it. (not implemented yet?)
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html

Regards,
Hironori


Re: Handling large state (incremental snapshot?)

2016-04-05 Thread Aljoscha Krettek
Hi,
I guess you are using the FsStateBackend, is that correct? You could try
using the RocksDB state backend:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend

With this, throughput will be lower but the overhead per checkpoint could
be lower. Also, with this most of the file copying necessary for the
checkpoint will be done while data processing keeps running (asynchronous
snapshot).

As to incremental snapshots. I'm afraid this feature is not yet implemented
but we're working on it.

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi 
wrote:

> Hello,
>
> I am trying to implement windowed distinct count on a stream. In this
> case, the state
> have to hold all distinct value in the window, so can be large.
>
> In my test, if the state size become about 400MB, checkpointing takes
> 40sec and spends most of Taskmanager's CPU.
> Are there any good way to handle this situation?
>
> Flink document mentions about incremental snapshot, and I am interested in
> it,
> but could not find how to enable it. (not implemented yet?)
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>
> Regards,
> Hironori
>


Re: Handling large state (incremental snapshot?)

2016-04-05 Thread Hironori Ogibayashi
Aljoscha,

Thank you for your quick response.
Yes, I am using FsStateBackend, so I will try RocksDB backend.

Regards,
Hironori

2016-04-05 21:23 GMT+09:00 Aljoscha Krettek :
> Hi,
> I guess you are using the FsStateBackend, is that correct? You could try
> using the RocksDB state backend:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>
> With this, throughput will be lower but the overhead per checkpoint could be
> lower. Also, with this most of the file copying necessary for the checkpoint
> will be done while data processing keeps running (asynchronous snapshot).
>
> As to incremental snapshots. I'm afraid this feature is not yet implemented
> but we're working on it.
>
> Cheers,
> Aljoscha
>
> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi 
> wrote:
>>
>> Hello,
>>
>> I am trying to implement windowed distinct count on a stream. In this
>> case, the state
>> have to hold all distinct value in the window, so can be large.
>>
>> In my test, if the state size become about 400MB, checkpointing takes
>> 40sec and spends most of Taskmanager's CPU.
>> Are there any good way to handle this situation?
>>
>> Flink document mentions about incremental snapshot, and I am interested in
>> it,
>> but could not find how to enable it. (not implemented yet?)
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>
>> Regards,
>> Hironori


Flink Job History Dump

2016-04-05 Thread Robert Schmidtke
Hi everyone,

I'm using Flink 0.10.2 to run some benchmarks on my cluster and I would
like to compare it to Spark 1.6.0. Spark has an eventLog property that I
can use to have the history written to HDFS, and then later view it offline
on the History Server.

Does Flink have a similar Feature, especially for offline analysis of a
job's history/events? I know of the Web UI, but I would like to be able to
run my own analysis on top of the data. There is the Monitoring REST API
and I'm wondering if it's possible to gain access to the raw data this API
exposes, and possibly view it on a locally running web UI.

Thanks a lot in advance
Robert


-- 
My GPG Key ID: 336E2680


Re: Flink Job History Dump

2016-04-05 Thread Ufuk Celebi
Hey Robert!

This is currently not possible :-(, but this is a feature that is on
Flink's road map.

A very inconvenient workaround could be to manually query the REST
APIs [1] and dump the responses somewhere and query it there.

– Ufuk

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/monitoring_rest_api.html

On Tue, Apr 5, 2016 at 4:00 PM, Robert Schmidtke  wrote:
> Hi everyone,
>
> I'm using Flink 0.10.2 to run some benchmarks on my cluster and I would like
> to compare it to Spark 1.6.0. Spark has an eventLog property that I can use
> to have the history written to HDFS, and then later view it offline on the
> History Server.
>
> Does Flink have a similar Feature, especially for offline analysis of a
> job's history/events? I know of the Web UI, but I would like to be able to
> run my own analysis on top of the data. There is the Monitoring REST API and
> I'm wondering if it's possible to gain access to the raw data this API
> exposes, and possibly view it on a locally running web UI.
>
> Thanks a lot in advance
> Robert
>
>
> --
> My GPG Key ID: 336E2680


Re: Powered by Flink

2016-04-05 Thread Robert Metzger
Hi everyone,

I would like to bring the "Powered by Flink" wiki page [1] to the attention
of Flink user's who recently joined the Flink community. The list tracks
which organizations are using Flink.
If your company / university / research institute / ... is using Flink but
the name is not yet listed there, let me know and I'll add the name.

Regards,
Robert

[1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink


On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax  wrote:

> +1
>
> On 10/19/2015 04:05 PM, Maximilian Michels wrote:
> > +1 Let's collect in the Wiki for now. At some point in time, we might
> > want to have a dedicated page on the Flink homepage.
> >
> > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther 
> wrote:
> >> Ah ok, sorry. I think linking to the wiki is also ok.
> >>
> >>
> >> On 19.10.2015 15:18, Fabian Hueske wrote:
> >>>
> >>> @Timo: The proposal was to keep the list in the wiki (can be easily
> >>> extended) but link from the main website to the wiki page.
> >>>
> >>> 2015-10-19 15:16 GMT+02:00 Timo Walther :
> >>>
>  +1 for adding it to the website instead of wiki.
>  "Who is using Flink?" is always a question difficult to answer to
>  interested users.
> 
> 
>  On 19.10.2015 15:08, Suneel Marthi wrote:
> 
>  +1 to this.
> 
>  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske 
> wrote:
> 
> > Sounds good +1
> >
> > 2015-10-19 14:57 GMT+02:00 Márton Balassi < <
> balassi.mar...@gmail.com>
> > balassi.mar...@gmail.com>:
> >
> >> Thanks for starting and big +1 for making it more prominent.
> >>
> >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <  >
> >
> > fhue...@gmail.com> wrote:
> >>>
> >>> Thanks for starting this Kostas.
> >>>
> >>> I think the list is quite hidden in the wiki. Should we link from
> >>> flink.apache.org to that page?
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas < 
> >
> > ktzou...@apache.org>:
> 
>  Hi everyone,
> 
>  I started a "Powered by Flink" wiki page, listing some of the
>  organizations that are using Flink:
> 
> 
> https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
> 
>  If you would like to be added to the list, just send me a short
> email
>  with your organization's name and a description and I will add
> you to
> >
> > the
> 
>  wiki page.
> 
>  Best,
>  Kostas
> 
> >>>
> 
> 
> >>
>
>


Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
Hello Ufuk,

I'm using EMR 4.4.0.

Thanks,
Timur

On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi  wrote:

> Hey Timur,
>
> which EMR version are you using?
>
> – Ufuk
>
> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov 
> wrote:
> > Thanks for the answer, Ken.
> >
> > My understanding is that file system selection is driven by the following
> > sections in core-site.xml:
> > 
> >   fs.s3.impl
> >
> >   org.apache.hadoop.fs.s3native.NativeS3FileSystem
> > 
> >
> > 
> >   fs.s3n.impl
> >   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> > 
> >
> > If I run the program using configuration above with s3n (and also
> modifying
> > credential keys to use s3n) it fails with the same error, but there is no
> > "... opening key ..." logs. S3a seems to be not supported, it fails with
> the
> > following:
> > Caused by: java.io.IOException: No file system found with scheme s3a,
> > referenced in file URI 's3a://'.
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> > at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
> > ... 23 more
> >
> > I am puzzled by the fact that EMRFS is still apparently referenced
> somewhere
> > as an implementation for S3 protocol, I'm not able to locate where this
> > configuration is set.
> >
> >
> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler  >
> > wrote:
> >>
> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> s3.
> >>
> >> Though EMR has some support for magically treating the s3 protocol as
> s3n
> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >>
> >> What happens if you use s3n:/// for the --input
> >> parameter?
> >>
> >> — Ken
> >>
> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov 
> >> wrote:
> >>
> >> Hello,
> >>
> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
> succeeded
> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
> Flink
> >> Job, unload outputs from HDFS to S3.
> >>
> >> However, ideally I'd like to read/write data directly from/to S3. I
> >> followed the instructions here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
> ,
> >> more specifically I:
> >>   1. Modified flink-conf to point to /etc/hadoop/conf
> >>   2. Modified core-site.xml per link above (not clear why why it is not
> >> using IAM, I had to provide AWS keys explicitly).
> >>
> >> Run the following command:
> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
> yarn-cluster
> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> >> s3:// --output hdfs:///flink-output
> >>
> >> First, I see messages like that:
> >> 2016-04-04 21:37:10,418 INFO
> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening
> key
> >> '' for reading at position '333000'
> >>
> >> Then, it fails with the following error:
> >>
> >> 
> >>
> >>  The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The program
> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> >> (WordCount Example)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >>
> >> at
> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >>
> >> at
> >>
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> >>
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>
> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>
> >> at
> >>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>
> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>
> >> at
> >>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>
> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>
> >> Caused by: org.apache.flink.runtime.client.JobExecutionExcep

Re: Powered by Flink

2016-04-05 Thread Slim Baltagi
Hi 

The following are missing in the ‘Powered by Flink’ list: 
king.com 
https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
Otto Group  
http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/ 

Eura Nova https://research.euranova.eu/flink-forward-2015-talk/ 

Big Data Europe http://www.big-data-europe.eu
Thanks 

Slim Baltagi


> On Apr 5, 2016, at 10:08 AM, Robert Metzger  wrote:
> 
> Hi everyone,
> 
> I would like to bring the "Powered by Flink" wiki page [1] to the attention 
> of Flink user's who recently joined the Flink community. The list tracks 
> which organizations are using Flink.
> If your company / university / research institute / ... is using Flink but 
> the name is not yet listed there, let me know and I'll add the name.
> 
> Regards,
> Robert
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink 
> 
> 
> 
> On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax  > wrote:
> +1
> 
> On 10/19/2015 04:05 PM, Maximilian Michels wrote:
> > +1 Let's collect in the Wiki for now. At some point in time, we might
> > want to have a dedicated page on the Flink homepage.
> >
> > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther  > > wrote:
> >> Ah ok, sorry. I think linking to the wiki is also ok.
> >>
> >>
> >> On 19.10.2015 15:18, Fabian Hueske wrote:
> >>>
> >>> @Timo: The proposal was to keep the list in the wiki (can be easily
> >>> extended) but link from the main website to the wiki page.
> >>>
> >>> 2015-10-19 15:16 GMT+02:00 Timo Walther  >>> >:
> >>>
>  +1 for adding it to the website instead of wiki.
>  "Who is using Flink?" is always a question difficult to answer to
>  interested users.
> 
> 
>  On 19.10.2015 15:08, Suneel Marthi wrote:
> 
>  +1 to this.
> 
>  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske   > wrote:
> 
> > Sounds good +1
> >
> > 2015-10-19 14:57 GMT+02:00 Márton Balassi <  > >
> > balassi.mar...@gmail.com >:
> >
> >> Thanks for starting and big +1 for making it more prominent.
> >>
> >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <  >> >
> >
> > fhue...@gmail.com > wrote:
> >>>
> >>> Thanks for starting this Kostas.
> >>>
> >>> I think the list is quite hidden in the wiki. Should we link from
> >>> flink.apache.org  to that page?
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas <  >>> >
> >
> > ktzou...@apache.org >:
> 
>  Hi everyone,
> 
>  I started a "Powered by Flink" wiki page, listing some of the
>  organizations that are using Flink:
> 
>  https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink 
>  
> 
>  If you would like to be added to the list, just send me a short email
>  with your organization's name and a description and I will add you to
> >
> > the
> 
>  wiki page.
> 
>  Best,
>  Kostas
> 
> >>>
> 
> 
> >>
> 
> 



Convert Scala DataStream to Java DataStream

2016-04-05 Thread Saiph Kappa
Hi,

I'm programming in scala and using some extra libraries made in Java. My
question is:  how can I easily convert
"org.apache.flink.streaming.scala.DataStream" to
"org.apache.flink.streaming.api.datastream.DataStream"?

Thanks.


Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Ufuk Celebi
Hey Timur,

if you are using EMR with IAM roles, Flink should work out of the box.
You don't need to change the Hadoop config and the IAM role takes care
of setting up all credentials at runtime. You don't need to hardcode
any keys in your application that way and this is the recommended way
to go in order to not worry about securely exchanging the keys and
then keeping them secure afterwards.

With EMR 4.4.0 you have to use a Flink binary version built against
Hadoop 2.7. Did you do that? Can you please retry with an
out-of-the-box Flink and just run it like this:

HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.

Hope this helps! Please report back. :-)

– Ufuk


On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov  wrote:
> Hello Ufuk,
>
> I'm using EMR 4.4.0.
>
> Thanks,
> Timur
>
> On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi  wrote:
>>
>> Hey Timur,
>>
>> which EMR version are you using?
>>
>> – Ufuk
>>
>> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov 
>> wrote:
>> > Thanks for the answer, Ken.
>> >
>> > My understanding is that file system selection is driven by the
>> > following
>> > sections in core-site.xml:
>> > 
>> >   fs.s3.impl
>> >
>> >   org.apache.hadoop.fs.s3native.NativeS3FileSystem
>> > 
>> >
>> > 
>> >   fs.s3n.impl
>> >   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
>> > 
>> >
>> > If I run the program using configuration above with s3n (and also
>> > modifying
>> > credential keys to use s3n) it fails with the same error, but there is
>> > no
>> > "... opening key ..." logs. S3a seems to be not supported, it fails with
>> > the
>> > following:
>> > Caused by: java.io.IOException: No file system found with scheme s3a,
>> > referenced in file URI 's3a://'.
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
>> > at
>> >
>> > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
>> > at
>> >
>> > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
>> > ... 23 more
>> >
>> > I am puzzled by the fact that EMRFS is still apparently referenced
>> > somewhere
>> > as an implementation for S3 protocol, I'm not able to locate where this
>> > configuration is set.
>> >
>> >
>> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
>> > 
>> > wrote:
>> >>
>> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
>> >> s3.
>> >>
>> >> Though EMR has some support for magically treating the s3 protocol as
>> >> s3n
>> >> (or maybe s3a now, with Hadoop 2.6 or later)
>> >>
>> >> What happens if you use s3n:/// for the --input
>> >> parameter?
>> >>
>> >> — Ken
>> >>
>> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov 
>> >> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
>> >> succeeded
>> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
>> >> Flink
>> >> Job, unload outputs from HDFS to S3.
>> >>
>> >> However, ideally I'd like to read/write data directly from/to S3. I
>> >> followed the instructions here:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
>> >> more specifically I:
>> >>   1. Modified flink-conf to point to /etc/hadoop/conf
>> >>   2. Modified core-site.xml per link above (not clear why why it is not
>> >> using IAM, I had to provide AWS keys explicitly).
>> >>
>> >> Run the following command:
>> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
>> >> yarn-cluster
>> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
>> >> --input
>> >> s3:// --output hdfs:///flink-output
>> >>
>> >> First, I see messages like that:
>> >> 2016-04-04 21:37:10,418 INFO
>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening
>> >> key
>> >> '' for reading at position '333000'
>> >>
>> >> Then, it fails with the following error:
>> >>
>> >> 
>> >>
>> >>  The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The program
>> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
>> >> (WordCount Example)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> >>
>> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>> >>
>> >> at
>> >>
>> >> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>> >>
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> at
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMet

Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
Yes, Hadoop version was the culprit. It turns out that EMRFS requires at
least 2.4.0 (judging from the exception in the initial post, I was not able
to find the official requirements).

Rebuilding Flink with Hadoop 2.7.1 and with Scala 2.11 worked like a charm
and I was able to run WordCount using S3 both for inputs and outputs. I did
*not* need to change any configuration (as outlined
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html
).

Thanks for bearing with me, Ufuk!

While looking for the solution I found this issue:
https://issues.apache.org/jira/browse/FLINK-1337. I have a setup for EMR
cluster now, so I can make  PR describing it. if it's still relevant. I,
for one example, would have saved couple days if I had a guide like that.

Thanks,
Timur


On Tue, Apr 5, 2016 at 10:43 AM, Ufuk Celebi  wrote:

> Hey Timur,
>
> if you are using EMR with IAM roles, Flink should work out of the box.
> You don't need to change the Hadoop config and the IAM role takes care
> of setting up all credentials at runtime. You don't need to hardcode
> any keys in your application that way and this is the recommended way
> to go in order to not worry about securely exchanging the keys and
> then keeping them secure afterwards.
>
> With EMR 4.4.0 you have to use a Flink binary version built against
> Hadoop 2.7. Did you do that? Can you please retry with an
> out-of-the-box Flink and just run it like this:
>
> HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.
>
> Hope this helps! Please report back. :-)
>
> – Ufuk
>
>
> On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov 
> wrote:
> > Hello Ufuk,
> >
> > I'm using EMR 4.4.0.
> >
> > Thanks,
> > Timur
> >
> > On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi  wrote:
> >>
> >> Hey Timur,
> >>
> >> which EMR version are you using?
> >>
> >> – Ufuk
> >>
> >> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <
> timur.fairu...@gmail.com>
> >> wrote:
> >> > Thanks for the answer, Ken.
> >> >
> >> > My understanding is that file system selection is driven by the
> >> > following
> >> > sections in core-site.xml:
> >> > 
> >> >   fs.s3.impl
> >> >
> >> >   org.apache.hadoop.fs.s3native.NativeS3FileSystem
> >> > 
> >> >
> >> > 
> >> >   fs.s3n.impl
> >> >   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> >> > 
> >> >
> >> > If I run the program using configuration above with s3n (and also
> >> > modifying
> >> > credential keys to use s3n) it fails with the same error, but there is
> >> > no
> >> > "... opening key ..." logs. S3a seems to be not supported, it fails
> with
> >> > the
> >> > following:
> >> > Caused by: java.io.IOException: No file system found with scheme s3a,
> >> > referenced in file URI 's3a://'.
> >> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> >> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
> >> > ... 23 more
> >> >
> >> > I am puzzled by the fact that EMRFS is still apparently referenced
> >> > somewhere
> >> > as an implementation for S3 protocol, I'm not able to locate where
> this
> >> > configuration is set.
> >> >
> >> >
> >> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
> >> > 
> >> > wrote:
> >> >>
> >> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> >> >> s3.
> >> >>
> >> >> Though EMR has some support for magically treating the s3 protocol as
> >> >> s3n
> >> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >> >>
> >> >> What happens if you use s3n:/// for the
> --input
> >> >> parameter?
> >> >>
> >> >> — Ken
> >> >>
> >> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov 
> >> >> wrote:
> >> >>
> >> >> Hello,
> >> >>
> >> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
> >> >> succeeded
> >> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
> >> >> Flink
> >> >> Job, unload outputs from HDFS to S3.
> >> >>
> >> >> However, ideally I'd like to read/write data directly from/to S3. I
> >> >> followed the instructions here:
> >> >>
> >> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
> ,
> >> >> more specifically I:
> >> >>   1. Modified flink-conf to point to /etc/hadoop/conf
> >> >>   2. Modified core-site.xml per link above (not clear why why it is
> not
> >> >> using IAM, I had to provide AWS keys explicitly).
> >> >>
> >> >> Run the following command:
> >> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
> >> >> yarn-cluster
> >> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar
> >> >> --input
> >> >> s3:// --output hdfs:///flink-output
> >> >>
> >> >> First, I see messages like that:
> >> >> 2016-04-04 21:

Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Zach Cox
Hi - I have some questions regarding Flink's checkpointing, specifically
related to storing state in the backends.

So let's say an operator in a streaming job is building up some state. When
it receives barriers from all of its input streams, does it store *all* of
its state to the backend? I think that is what the docs [1] and paper [2]
imply, but want to make sure. In other words, if the operator contains
100MB of state, and the backend is HDFS, does the operator copy all 100MB
of state to HDFS during the checkpoint?

Following on this example, say the operator is a global window and is
storing some state for each unique key observed in the stream of messages
(e.g. userId). Assume that over time, the number of observed unique keys
grows, so the size of the state also grows (the window state is never
purged). Is the entire operator state at the time of each checkpoint stored
to the backend? So that over time, the size of the state stored for each
checkpoint to the backend grows? Or is the state stored to the backend
somehow just the state that changed in some way since the last checkpoint?

Are old checkpoint states in the backend ever deleted / cleaned up? That
is, if all of the state for checkpoint n in the backend is all that is
needed to restore a failed job, then all state for all checkpoints m < n
should not be needed any more, right? Can all of those old checkpoints be
deleted from the backend? Does Flink do this?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
[2] http://arxiv.org/abs/1506.08603


Re: Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Konstantin Knauf
Hi Zach,

some answers/comments inline.

Cheers

Konstantin

On 05.04.2016 20:39, Zach Cox wrote:
> Hi - I have some questions regarding Flink's checkpointing, specifically
> related to storing state in the backends.
> 
> So let's say an operator in a streaming job is building up some state.
> When it receives barriers from all of its input streams, does it store
> *all* of its state to the backend? I think that is what the docs [1] and
> paper [2] imply, but want to make sure. In other words, if the operator
> contains 100MB of state, and the backend is HDFS, does the operator copy
> all 100MB of state to HDFS during the checkpoint?

Yes. With the filesystem backend this happens synchronously, with
RocksDB backend the transfer to HDFS is asynchronous.

> Following on this example, say the operator is a global window and is
> storing some state for each unique key observed in the stream of
> messages (e.g. userId). Assume that over time, the number of observed
> unique keys grows, so the size of the state also grows (the window state
> is never purged). Is the entire operator state at the time of each
> checkpoint stored to the backend? So that over time, the size of the
> state stored for each checkpoint to the backend grows? Or is the state
> stored to the backend somehow just the state that changed in some way
> since the last checkpoint?

The complete state is checkpointed. Incremental backups are currently
not supported, but seem to be on the roadmap.

> Are old checkpoint states in the backend ever deleted / cleaned up? That
> is, if all of the state for checkpoint n in the backend is all that is
> needed to restore a failed job, then all state for all checkpoints m < n
> should not be needed any more, right? Can all of those old checkpoints
> be deleted from the backend? Does Flink do this?

To my knowledge flink takes care of deleting old checkpoints (I think it
says so in the documentation about savepoints.). In my experience
though, if a job is cancelled or crashes, the checkpoint files are
usually not cleaned up. So some housekeeping might be necessary.

> Thanks,
> Zach
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
> [2] http://arxiv.org/abs/1506.08603
> 



-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


RE: Multiple operations on a WindowedStream

2016-04-05 Thread Kanak Biscuitwala
This worked when I ran my test code locally, but I'm seeing nothing reach my 
sink when I try to run this in YARN (previously, when I just echo'ed all sums 
to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09 consumer = new 
FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing 
below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new 
SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3, TopK is an 
AllWindowFunction that produces List>, and 
ToString is a MapFunction that is just a wrapper on Object#toString().

Anything obvious that I'm doing wrong?

> From: aljos...@apache.org 
> Date: Fri, 1 Apr 2016 09:41:12 + 
> Subject: Re: Multiple operations on a WindowedStream 
> To: user@flink.apache.org 
> 
> Hi, 
> if you are using ingestion-time (or event-time) as your stream time 
> characteristic, i.e.: 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or 
> TimeCharacteristic.EventTime 
> 
> you can apply several window transforms after another and they will 
> apply the same "time window" because they work on the element 
> timestamps. What you can then do is have a window that does the 
> aggregation and then another one (that has to be global) to select the 
> top elements: 
> 
> result = input 
> .keyBy() 
> .timeWindow(Time.minutes(1), Time.seconds(5)) 
> .sum(2) 
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding 
> window here, because the above will output a new window every 5 seconds 
> .apply() 
> 
> I hope this helps. 
> 
> Cheers, 
> Aljoscha 
> 
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan 
> mailto:balaji.rajagopa...@olacabs.com>> 
> wrote: 
> I had a similar use case and ended writing the aggregation logic in the 
> apply function, could not find any better solution. 
> 
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala 
> mailto:kana...@hotmail.com>> wrote: 
> Hi, 
> 
> I would like to write something that does something like a word count, 
> and then emits only the 10 highest counts for that window. Logically, I 
> would want to do something like: 
> 
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) 
> 
> However, the window context is lost after I do the sum aggregation. Is 
> there a straightforward way to express this logic in Flink 1.0? One way 
> I can think of is to have a complex function in apply() that has state, 
> but I would like to know if there is something a little cleaner than 
> that. 
> 
> Thanks, 
> Kanak 
> 
  

Re: Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Ufuk Celebi
Hey Zach and Konstantin,

Great questions and answers. We can try to make this more explicit in the docs.

On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
 wrote:
> To my knowledge flink takes care of deleting old checkpoints (I think it
> says so in the documentation about savepoints.). In my experience
> though, if a job is cancelled or crashes, the checkpoint files are
> usually not cleaned up. So some housekeeping might be necessary.

Regarding cleanup: currently only the latest successful checkpoint is retained.

On graceful shutdown, all checkpoints should be cleaned up as far as I
know. Savepoints always have to be cleaned up manually.

On crashes, the checkpoint state has to be cleaned up manually (if the
JVM shut down hooks did not run).

@Konstantin: did you have lingering state without crashes?

– Ufuk


Re: Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Konstantin Knauf
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:
> Hey Zach and Konstantin,
> 
> Great questions and answers. We can try to make this more explicit in the 
> docs.
> 
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
>  wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
> 
> Regarding cleanup: currently only the latest successful checkpoint is 
> retained.
> 
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
> 
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
> 
> @Konstantin: did you have lingering state without crashes?
> 
> – Ufuk
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Kafka state backend?

2016-04-05 Thread Zach Cox
Hi - as clarified in another thread [1] stateful operators store all of
their current state in the backend on each checkpoint. Just curious if
Kafka topics with log compaction have ever been considered as a possible
state backend?

Samza [2] uses RocksDB as a local state store, with all writes also going
to a log-compacted Kafka topic for persistence. This seems like it might
also be a good alternative backend in Flink for jobs with large amounts of
long-lasting state. You would give up some throughput (due to Kafka
producer writes) but there would be almost nothing to do on checkpoints.

Just wanted to propose the idea and see if it has already been discussed,
or maybe I'm missing some reasons why it would be a bad idea.

Thanks,
Zach

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-state-stored-in-backend-and-deleting-old-checkpoint-state-td5927.html
[2]
http://samza.apache.org/learn/documentation/0.10/container/state-management.html#local-state-in-samza


Re: Checkpoint state stored in backend, and deleting old checkpoint state

2016-04-05 Thread Zach Cox
Thanks for the details Konstantin and Ufuk!


On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <
konstantin.kn...@tngtech.com> wrote:

> Hi Ufuk,
>
> I thought so, but I am not sure when and where ;) I will let you know,
> if I come across it again.
>
> Cheers,
>
> Konstantin
>
> On 05.04.2016 21:10, Ufuk Celebi wrote:
> > Hey Zach and Konstantin,
> >
> > Great questions and answers. We can try to make this more explicit in
> the docs.
> >
> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> >  wrote:
> >> To my knowledge flink takes care of deleting old checkpoints (I think it
> >> says so in the documentation about savepoints.). In my experience
> >> though, if a job is cancelled or crashes, the checkpoint files are
> >> usually not cleaned up. So some housekeeping might be necessary.
> >
> > Regarding cleanup: currently only the latest successful checkpoint is
> retained.
> >
> > On graceful shutdown, all checkpoints should be cleaned up as far as I
> > know. Savepoints always have to be cleaned up manually.
> >
> > On crashes, the checkpoint state has to be cleaned up manually (if the
> > JVM shut down hooks did not run).
> >
> > @Konstantin: did you have lingering state without crashes?
> >
> > – Ufuk
> >
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Back Pressure details

2016-04-05 Thread Zach Cox
Hi - I'm trying to identify bottlenecks in my Flink streaming job, and am
curious about the Back Pressure view in the job manager web UI. If there
are already docs for Back Pressure please feel free to just point me to
those. :)

When "Sampling in progress..." is displayed, what exactly is happening?

What do the values in the Ratio column for each Subtask mean exactly?

What does Status such as OK, High, etc mean? Are these determined from the
Ratio values?

If my job graph looks like Source => A => B => Sink, with Back Pressure OK
for Source and Sink, but High for A and B, what does that suggest?

Thanks,
Zach


Accessing RDF triples using Flink

2016-04-05 Thread Ritesh Kumar Singh
Hi,

I need some suggestions regarding accessing RDF triples from flink. I'm
trying to integrate flink in a pipeline where the input for flink comes
from SPARQL query on a Jena model. And after modification of triples using
flink, I will be performing SPARQL update using Jena to save my changes.

   - Are there any recommended input format for loading the triples to
   flink?
   - Will this use case be classified as a flink streaming job or a batch
   processing job?
   - How will loading of the dataset vary with the input size?
   - Are there any recommended packages/ projects for these type of
   projects?

Any suggestion will be of great help.

Regards,
Ritesh
https://riteshtoday.wordpress.com/


Re: Convert Scala DataStream to Java DataStream

2016-04-05 Thread Stefano Baghino
Hi Saiph,

all you have to do is to invoke the `javaStream` method on your Scala
DataStream.

Hope I've been helpful. :)

On Tue, Apr 5, 2016 at 7:35 PM, Saiph Kappa  wrote:

> Hi,
>
> I'm programming in scala and using some extra libraries made in Java. My
> question is:  how can I easily convert
> "org.apache.flink.streaming.scala.DataStream" to
> "org.apache.flink.streaming.api.datastream.DataStream"?
>
> Thanks.
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Chiwan Park
Hi Timur,

Great! Bootstrap action for Flink is good for AWS users. I think the bootstrap 
action scripts would be placed in `flink-contrib` directory.

If you want, one of people in PMC of Flink will be assign FLINK-1337 to you.

Regards,
Chiwan Park

> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov  wrote:
> 
> I had a guide like that.
> 



RemoteTransportException when trying to redis in flink code

2016-04-05 Thread Balaji Rajagopalan
I am trying to use AWS EMR yarn cluster where the flink code runs, in one
of apply window function, I try to set some values in redis it fails. I
have tried to access the same redis with no flink code and get/set works,
but from the flink I get  into this exception. Any inputs on what might be
going wrong.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Error at remote task manager 'some-ip'.

at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by:
org.apache.flink.runtime.io.network.partition.ProducerFailedException

at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at
io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at
io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException:
Connection refused

at com.redis.IO $class.connect(IO.scala:37)

at com.redis.RedisClient.connect(RedisClient.scala:94)

at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

at com.redis.RedisClient.initialize(RedisClient.scala:94)

at com.redis.RedisClient.(RedisClient.scala:98)

at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

at
org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

at com.redis.RedisClientPool.withClient(Pool.scala:34)

at
com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

at
com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

at
com.olacabs.peakpricing.datastream.TotalMappedFunction.join(Total

State in external db (dynamodb)

2016-04-05 Thread Shannon Carey
Hi, new Flink user here!

I found a discussion on user@flink.apache.org about using DynamoDB as a sink. 
However, as noted, sinks have an at-least-once guarantee so your operations 
must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write 
the state to the external store via a custom State Backend. Since the state 
participates in checkpointing, you don't have to worry about idempotency: every 
time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the 
state from memory once a TTL is reached during which no events have come in for 
that state. Subsequently, when an event is processed, we must be able to 
quickly load up any evicted state. Does this sound reasonable? We are 
considering using DynamoDB for our state backend because it seems like all we 
will need is a key-value store. The only weakness of this is that if state gets 
older than, say, 2 years we would like to get rid of it which might not be easy 
in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal 
with getting rid of old state (either evicting from memory or TTL/aging out 
entirely)?

Thanks for your time!
Shannon Carey


Re: State in external db (dynamodb)

2016-04-05 Thread Sanne de Roever
FYI Cassandra has a TTL on data:
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey  wrote:

> Hi, new Flink user here!
>
> I found a discussion on user@flink.apache.org about using DynamoDB as a
> sink. However, as noted, sinks have an at-least-once guarantee so your
> operations must idempotent.
>
> However, another way to go about this (and correct me if I'm wrong) is to
> write the state to the external store via a custom State Backend. Since the
> state participates in checkpointing, you don't have to worry about
> idempotency: every time state is checkpointed, overwrite the value of that
> key.
>
> We are starting a project with Flink, and we are interested in evicting
> the state from memory once a TTL is reached during which no events have
> come in for that state. Subsequently, when an event is processed, we must
> be able to quickly load up any evicted state. Does this sound reasonable?
> We are considering using DynamoDB for our state backend because it seems
> like all we will need is a key-value store. The only weakness of this is
> that if state gets older than, say, 2 years we would like to get rid of it
> which might not be easy in DynamoDB. I don't suppose Flink has any
> behind-the-scenes features that deal with getting rid of old state (either
> evicting from memory or TTL/aging out entirely)?
>
> Thanks for your time!
> Shannon Carey
>


Re: Powered by Flink

2016-04-05 Thread Henry Saputra
Thanks, Slim. I have just updated the wiki page with this entries.

On Tue, Apr 5, 2016 at 10:20 AM, Slim Baltagi  wrote:

> Hi
>
> The following are missing in the ‘Powered by Flink’ list:
>
>- *king.com  *
>
> https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
>- *Otto Group  *
>http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/
>- *Eura Nova *https://research.euranova.eu/flink-forward-2015-talk/
>- *Big Data Europe *http://www.big-data-europe.eu
>
> Thanks
>
> Slim Baltagi
>
>
> On Apr 5, 2016, at 10:08 AM, Robert Metzger  wrote:
>
> Hi everyone,
>
> I would like to bring the "Powered by Flink" wiki page [1] to the
> attention of Flink user's who recently joined the Flink community. The list
> tracks which organizations are using Flink.
> If your company / university / research institute / ... is using Flink but
> the name is not yet listed there, let me know and I'll add the name.
>
> Regards,
> Robert
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>
>
> On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax  wrote:
>
>> +1
>>
>> On 10/19/2015 04:05 PM, Maximilian Michels wrote:
>> > +1 Let's collect in the Wiki for now. At some point in time, we might
>> > want to have a dedicated page on the Flink homepage.
>> >
>> > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther 
>> wrote:
>> >> Ah ok, sorry. I think linking to the wiki is also ok.
>> >>
>> >>
>> >> On 19.10.2015 15:18, Fabian Hueske wrote:
>> >>>
>> >>> @Timo: The proposal was to keep the list in the wiki (can be easily
>> >>> extended) but link from the main website to the wiki page.
>> >>>
>> >>> 2015-10-19 15:16 GMT+02:00 Timo Walther :
>> >>>
>>  +1 for adding it to the website instead of wiki.
>>  "Who is using Flink?" is always a question difficult to answer to
>>  interested users.
>> 
>> 
>>  On 19.10.2015 15:08, Suneel Marthi wrote:
>> 
>>  +1 to this.
>> 
>>  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske 
>> wrote:
>> 
>> > Sounds good +1
>> >
>> > 2015-10-19 14:57 GMT+02:00 Márton Balassi < <
>> balassi.mar...@gmail.com>
>> > balassi.mar...@gmail.com>:
>> >
>> >> Thanks for starting and big +1 for making it more prominent.
>> >>
>> >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske < <
>> fhue...@gmail.com>
>> >
>> > fhue...@gmail.com> wrote:
>> >>>
>> >>> Thanks for starting this Kostas.
>> >>>
>> >>> I think the list is quite hidden in the wiki. Should we link from
>> >>> flink.apache.org to that page?
>> >>>
>> >>> Cheers, Fabian
>> >>>
>> >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas < 
>> >
>> > ktzou...@apache.org>:
>> 
>>  Hi everyone,
>> 
>>  I started a "Powered by Flink" wiki page, listing some of the
>>  organizations that are using Flink:
>> 
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>> 
>>  If you would like to be added to the list, just send me a short
>> email
>>  with your organization's name and a description and I will add
>> you to
>> >
>> > the
>> 
>>  wiki page.
>> 
>>  Best,
>>  Kostas
>> 
>> >>>
>> 
>> 
>> >>
>>
>>
>
>