Where does Flink store intermediate state and triggering/scheduling a delayed event?

2016-02-02 Thread Soumya Simanta
I'm getting started with Flink and had a very fundamental doubt. 1) Where does Flink capture/store intermediate state? For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from

Understanding code of CountTrigger

2016-02-02 Thread Nirmalya Sengupta
Hello all, Here's a code comment from org.apache.flink.streaming.api.windowing.triggers.Trigger: /** * Result type for trigger methods. This determines what happens which the window. * * * On {@code FIRE} the pane is evaluated and results are emitted. *The con

Re: How to register aggregation convergence criterion to bulk iteration in scala API?

2016-02-02 Thread Stephan Ewen
You are right, that is currently missing in the Scala API. Would be good to add this, for feature completeness in the Scala API. As a workaround for now: Can you access the Java IterativeDataSet from the Scala data set, and register it there? Greetings, Stephan On Thu, Jan 28, 2016 at 11:05 PM,

Re: Left join with unbalanced dataset

2016-02-02 Thread Stephan Ewen
To make sure this discussion does not go in a wrong direction: There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc. The issue here is different - One possible reason is that the network s

Tumbling Windows with Processing Time

2016-02-02 Thread yutao sun
Hi Flink users, I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 : In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received. T

Re: Left join with unbalanced dataset

2016-02-02 Thread Gábor Gévay
Hello Arnaud, > Flink does not start the reduce operation until all lines have > been created (memory bottleneck is during the collection > of all lines) ; but theorically it is possible. The problem that `S.groupBy(...).reduce(...)` needs to fully materialize S comes from the fact that the imple

RE: Left join with unbalanced dataset

2016-02-02 Thread LINZ, Arnaud
Thanks, Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch. I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce. I am willing to optimize my code by

Re: Left join with unbalanced dataset

2016-02-02 Thread Ufuk Celebi
> On 02 Feb 2016, at 15:15, LINZ, Arnaud wrote: > > Hi, > > Running again with more RAM made the treatement go further, but Yarn still > killed one container for memory consumption. I will experiment various memory > parameters. OK, the killing of the container probably triggered the Remote

Re: Left join with unbalanced dataset

2016-02-02 Thread Robert Metzger
Hi Arnaud, you can retrieve the logs of a yarn application by calling "yarn logs -applicationId ". Its going to output you the logs of all Taskmanagers and the job manager in one stream. I would pipe the output into a file and then search for the position where the log for the failing taskmanager

RE: Left join with unbalanced dataset

2016-02-02 Thread LINZ, Arnaud
Hi, Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters. How do I retrieve the log of a specific task manager post-mortem? I don't use a permanent Flink/Yarn container (it's killed u

Re: Left join with unbalanced dataset

2016-02-02 Thread Ufuk Celebi
> On 02 Feb 2016, at 14:31, LINZ, Arnaud wrote: > > Hi, > > Unfortunalety, it still fails, but with a different error (see below). > Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT > compiled jar submitted as a batch job using the "0.10.0" flink installation. This m

Re: DataStreamUtils and Scala

2016-02-02 Thread Márton Balassi
Opened the PR. [1] Will merge the re-add "getJavaStream()" method commit as soon as travis passes if no objections, the second approach can be discussed on github. [1] https://github.com/apache/flink/pull/1574 Best, Marton On Mon, Feb 1, 2016 at 10:56 PM, Márton Balassi wrote: > I'll do the

RE: Left join with unbalanced dataset

2016-02-02 Thread LINZ, Arnaud
Hi, Unfortunalety, it still fails, but with a different error (see below). Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation. org.apache.flink.runtime.io.network.netty.exception.RemoteTransportEx

Re: Left join with unbalanced dataset

2016-02-02 Thread Ufuk Celebi
> On 02 Feb 2016, at 13:28, LINZ, Arnaud wrote: > > Thanks, > I’m using the official 0.10 release. I will try to use the 0.10 snapshot. > > FYI, setting the heap cut-off ratio to 0.5 lead to the following error : That’s the error Stephan was referring to. Does the snapshot version fix it for

RE: Left join with unbalanced dataset

2016-02-02 Thread LINZ, Arnaud
Thanks, I’m using the official 0.10 release. I will try to use the 0.10 snapshot. FYI, setting the heap cut-off ratio to 0.5 lead to the following error : 12:20:17,313 INFO org.apache.flink.yarn.YarnJobManager - Status of job c55216ab9383fd14e1d287a69a6e0f7e (KUBERA-GEO

Re: Left join with unbalanced dataset

2016-02-02 Thread Stephan Ewen
Hi Arnaud! Which version of Flink are you using? In 0.10.1, the Netty library version that we use has changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT. If that turns out to be the cause

RE: Left join with unbalanced dataset

2016-02-02 Thread LINZ, Arnaud
Hi, Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2. Heap size & thread number is OK under JvisualVM. So the problem is elsewhere. Do Flink uses off-heap memory ? How can I monit

Re: Change #TaskSlots in web interface

2016-02-02 Thread Till Rohrmann
Hi Sendoh In order to change the configuration you have to modify `conf/flink-config.yaml` and then restart the cluster. Cheers, Till On Tue, Feb 2, 2016 at 10:14 AM, HungChang wrote: > Hi, > > I remember there is a web interface(port: 6XXX) that can change > configuration of Job Manager. > e.

Change #TaskSlots in web interface

2016-02-02 Thread HungChang
Hi, I remember there is a web interface(port: 6XXX) that can change configuration of Job Manager. e.g. taskmanager.numberOfTaskSlots, taskmanager.heap.mb But I can only find port 8081 that showing the configuration and I cannot change them. Did I miss anything? Best, Sendoh -- View this me