Re: Flink HA mode

2015-09-08 Thread Ufuk Celebi
> On 09 Sep 2015, at 04:48, Emmanuel wrote: > > my questions is: how critical is the bootstrap ip list in masters? Hey Emmanuel, good questions. I read over the docs for this again [1] and you are right that we should make this clearer. The “masters" file is only relevant for the start/stop

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Yes. The keys are constantly changing. Indeed each unique event has its own key (the event itself). The purpose was to do an event deduplication ... > Am 08.09.2015 um 20:05 schrieb Aljoscha Krettek : > > Hi Rico, > I have a suspicion. What is the distribution of your keys? That is, are there

RE: Flink HA mode

2015-09-08 Thread Emmanuel
my questions is: how critical is the bootstrap ip list in masters? does this get updated or does it have to be updated by some other service? From: zhangruc...@huawei.com To: user@flink.apache.org Subject: re: Flink HA mode Date: Wed, 9 Sep 2015 00:48:42 + In order to discover new J

re: Flink HA mode

2015-09-08 Thread Zhangrucong
In order to discover new JM,I think must use ZK. ZK has the ability to find a new node or the content of node changed. First JM must create node in ZK, and write IP and port in node. TMs watch this node. When TMs find the node content change, TMs reconnect the new JM. Thanks. 发件人: Emmanuel [mai

Flink HA mode

2015-09-08 Thread Emmanuel
Looking at Flink HA mode. Why do you need to have the list of masters in the config if zookeeper is used to keep track of them? In an environment like Google Cloud or Container Engine, the JM may come back up but will likely have another IP address. Is the masters config file only for bootstrappi

Re: Event time in Flink streaming

2015-09-08 Thread Gyula Fóra
This is actually simpler than you think, you can just use the Time.of(...) helper: ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long startTime))... Gyula Martin Neumann ezt írta (időpont: 2015. szept. 8., K, 20:20): > Hej, > > I want to give TimeTriggerPolicy a try and

how can handles this query

2015-09-08 Thread hagersaleh
how can handles this query SELECT a.id, (SELECT MAX(created) FROM posts WHERE author_id = a.id) AS latest_post FROM authors a -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-can-handles-this-query-tp2769.html Sent from the Apache Flink U

Re: HadoopDataOutputStream maybe does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream

2015-09-08 Thread Stephan Ewen
Hi! I pushed a fix to the master to expose more methods. You can access the original Hadoop stream now, and you can also call "flush()" and "sync()" in the Flink stream, which get forwarded as "hflush()" and "hsync()" in Hadoop 2 (in Hadoop 1 these are not available). The fix is in the master an

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Stephan Ewen
We'll try to fix the issue for the upcoming milestone release... On Tue, Sep 8, 2015 at 8:33 PM, Robert Metzger wrote: > As I said, the workaround is using the "bin/flink" tool from the command > line. > I think it should be possible to add a "student" account on the cluster to > access the Flin

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Robert Metzger
As I said, the workaround is using the "bin/flink" tool from the command line. I think it should be possible to add a "student" account on the cluster to access the Flink installation? On Tue, Sep 8, 2015 at 12:36 PM, Florian Heyl wrote: > Ok I see, thank you. Do not have experience with that bu

Re: Event time in Flink streaming

2015-09-08 Thread Martin Neumann
Hej, I want to give TimeTriggerPolicy a try and see how much of a problem it will be in this use case. Is there any example on how to use it? I looked at the API descriptions but I'm confused now. cheers Martin On Fri, Aug 28, 2015 at 5:35 PM, Martin Neumann wrote: > The stream consists of log

Re: Performance Issue

2015-09-08 Thread Aljoscha Krettek
Hi Rico, I have a suspicion. What is the distribution of your keys? That is, are there many unique keys, do the keys keep evolving, i.e. is it always new and different keys? Cheers, Aljoscha On Tue, 8 Sep 2015 at 13:44 Rico Bergmann wrote: > I also see in the TM overview the CPU load is still a

Re: output writer

2015-09-08 Thread Michele Bertoni
yes you understood it right! but then, after that block, how can I partition data according to key1 (the output key) and save the order of key3? if it is possible Il giorno 08/set/2015, alle ore 18:39, Fabian Hueske mailto:fhue...@gmail.com>> ha scritto: I did not fully understand you last qu

Re: output writer

2015-09-08 Thread Fabian Hueske
I did not fully understand you last question, but I'll try to answer. If you do a myData.groupBy(key1, key2).sortGroup(key3).reduceGroup(myReduceFunction); Flink will do the grouping and sorting in a single sort over three fields. So the result will be sorted on key1, key2, and key3 (given that yo

Re: output writer

2015-09-08 Thread Michele Bertoni
ok I got -some of- the points :) I will do some tests and let you know what scares me in using the sort is that in our program we may sort data before output them if we don’t sort no problem at all but if we sort then: in one case sorting is done inside the group of the key (i.e. one sorted se

Re: output writer

2015-09-08 Thread Fabian Hueske
I think you should not extend the FileOutputFormat but implement a completely new OutputFormat. You can of course copy some of the FileOutputFormat code to your new format. Regarding the number of open files, I would make this a parameter. I guess you can have at least 64 files open per operator m

Re: output writer

2015-09-08 Thread Michele Bertoni
Thanks! your answer is really helpful actually I was just reading the FileOutputFormat and my idea was to extend it and use the open function to open multiple streams so it should be a mix of 1 and 4 but i have some questions: what is a good number of open files at the same time? (i mean, the s

Re: Memory management issue

2015-09-08 Thread Stephan Ewen
A quickfix would be to take the first join and give it a "JoinHint.REPARTITION_HASH_BUILD_SECOND" hint. The best thing would be to have batch exchanges for iterations. The second best thing would be to recognize in the optimizer that a batch exchange cannot happen (if inside an iteration) and i

Re: output writer

2015-09-08 Thread Fabian Hueske
Hi Michele, you need to directly use a FileSystem client (e.g., Hadoop's) to create and write to files. Have a look at the FileOutputFormat [1] which does this for a single file per operator instance / partition. Instead of creating a single file, you need to create one file for each key. However,

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
I also see in the TM overview the CPU load is still around 25% although there is no input to the program since minutes. The CPU load is degrading very slowly. The memory consumption is still fluctuating at a high level. It does not degrade. In my test I generated test input for 1 minute. Now

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Florian Heyl
Ok I see, thank you. Do not have experience with that but does there exists a possible work around? Am 08.09.2015 um 13:13 schrieb Robert Metzger : > That's the bug: https://issues.apache.org/jira/browse/FLINK-2632 > > On Tue, Sep 8, 2015 at 1:11 PM, Robert Metzger wrote: > There is a bug in

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
The marksweep value is very high, the scavenge very low. If this helps ;-) > Am 08.09.2015 um 11:27 schrieb Robert Metzger : > > It is in the "Information" column: http://i.imgur.com/rzxxURR.png > In the screenshot, the two GCs only spend 84 and 25 ms. > >> On Tue, Sep 8, 2015 at 10:34 AM, Ri

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Robert Metzger
That's the bug: https://issues.apache.org/jira/browse/FLINK-2632 On Tue, Sep 8, 2015 at 1:11 PM, Robert Metzger wrote: > There is a bug in the web client which sets the wrong class loader when > running the user code. > > On Tue, Sep 8, 2015 at 12:05 PM, Florian Heyl wrote: > >> Locally we are

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Robert Metzger
There is a bug in the web client which sets the wrong class loader when running the user code. On Tue, Sep 8, 2015 at 12:05 PM, Florian Heyl wrote: > Locally we are using the 0.9-SNAPSHOT but the cluster should work with the > 0.10-SNAPSHOT. I have no direct control of the cluster because our pr

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Florian Heyl
Locally we are using the 0.9-SNAPSHOT but the cluster should work with the 0.10-SNAPSHOT. I have no direct control of the cluster because our prof is responsible for that. The students are using the flink web submission client to upload their jar and run it on the cluster. Am 08.09.2015 um 12

Re: The main method caused an error. HDFS flink

2015-09-08 Thread Robert Metzger
Which version of Flink are you using? Have you tried submitting the job using the "./bin/flink run" tool? On Tue, Sep 8, 2015 at 11:44 AM, Florian Heyl wrote: > Dear Sir or Madam, > Me and my colleague are developing a pipeline based on scala and java to > classify cancer stages. This pipeline

Re: Memory management issue

2015-09-08 Thread Ufuk Celebi
> On 08 Sep 2015, at 10:12, Schueler, Ricarda > wrote: > > Hi, > > we tested it with the version 0.9.1, but unfortunately the issue persists. Thanks for helping me out debugging this Ricarda! :) From what I can tell, this is not a deadlock in the network runtime, but a join deadlock within

The main method caused an error. HDFS flink

2015-09-08 Thread Florian Heyl
Dear Sir or Madam, Me and my colleague are developing a pipeline based on scala and java to classify cancer stages. This pipeline should be uploaded on the hdfs (apache flink). The pipeline locally works fine but on the hdfs it crashes with the following error (see below). The main method is sim

Re: output writer

2015-09-08 Thread Michele Bertoni
Hi guys, sorry for late answer but I am still working to get this done but I don’t understand something I do have my own writeRecord function, but that function is not able to open new output stream or anything else so I don’t understand how to do that at first I think I should at least partiti

Re: Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Ah..Fortunately it seems to do what I need :) It efficiently filters the bigDataset retaining only the needed elements making the join feasible with few memory.. :) So that's a bug? Which should be the right way to achieve that behaviour with Flink? On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen w

Re: Performance Issue

2015-09-08 Thread Robert Metzger
It is in the "Information" column: http://i.imgur.com/rzxxURR.png In the screenshot, the two GCs only spend 84 and 25 ms. On Tue, Sep 8, 2015 at 10:34 AM, Rico Bergmann wrote: > Where can I find these information? I can see the memory usage and cpu > load. But where are the information on the GC

Re: Case of possible join optimization

2015-09-08 Thread Stephan Ewen
The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens. On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier wrote: > Obviously when trying to simplify my code I didn't substitute correctly > the variable of the jo

Re: Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be: DataSet, List>> atomSubset = attrToExpand.join(*subset* ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Do you think that a JoinHint to create a sort-merge join is equiv

Re: Memory management issue

2015-09-08 Thread Ufuk Celebi
> On 08 Sep 2015, at 10:41, Ufuk Celebi wrote: > > Hey Ricarda, > > I will try to reproduce this locally with the data sets in your repo. I just saw that the data is very small. Can you point me to a data set to reproduce?

Re: Case of possible join optimization

2015-09-08 Thread Stephan Ewen
Hi Flavio! No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join. One can do "semi-join-reduction", in case the join filters out many values (many elements from on

Re: Memory management issue

2015-09-08 Thread Ufuk Celebi
Hey Ricarda, I will try to reproduce this locally with the data sets in your repo. If you have any hints to reproduce this (available memory, which file you were using exactly), feel free to post it. :) – Ufuk > On 08 Sep 2015, at 10:12, Schueler, Ricarda > wrote: > > Hi, > > we tested it

Re: Performance Issue

2015-09-08 Thread Rico Bergmann
Where can I find these information? I can see the memory usage and cpu load. But where are the information on the GC? > Am 08.09.2015 um 09:34 schrieb Robert Metzger : > > The webinterface of Flink has a tab for the TaskManagers. There, you can also > see how much time the JVM spend with garb

AW: Memory management issue

2015-09-08 Thread Schueler, Ricarda
Hi, we tested it with the version 0.9.1, but unfortunately the issue persists. Best Ricarda Von: ewenstep...@gmail.com im Auftrag von Stephan Ewen Gesendet: Montag, 7. September 2015 00:39 An: user@flink.apache.org Betreff: Re: Memory management issue Hi!

Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Hi to all, I have a case where I don't understand why flink is not able to optimize the join between 2 datasets. My initial code was basically this: DataSet>> bigDataset = ...;//5.257.207 elements DataSet>> attrToExpand = ...;//65.000 elements DataSet> tmp = attrToExpand.joinWithHuge(subset).wh

Re: Performance Issue

2015-09-08 Thread Robert Metzger
The webinterface of Flink has a tab for the TaskManagers. There, you can also see how much time the JVM spend with garbage collection. Can you check whether the number of GC calls + the time spend goes up after 30 minutes? On Tue, Sep 8, 2015 at 8:37 AM, Rico Bergmann wrote: > Hi! > > I also thi