reduce error

2015-10-08 Thread Michele Bertoni
Hi everybody, I am facing a very strange problem Sometimes when I run my program one line of the result is totally wrong and if I repeat the execution with same input it can change The algorithm takes two dataset and execute something like a left outer join where if there is a match - it incre

Re: intermediate result reuse

2015-09-14 Thread Michele Bertoni
n call. The collector is used in function that can return an arbitrary number of results (0 to n). Best, Fabian 2015-09-14 20:58 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: Hi Stephan, I have one more question: what happens when I do collect inside a cogroup (i.

Re: intermediate result reuse

2015-09-14 Thread Michele Bertoni
will be resumed in the near future, definitely. Too many parts are already in place to not complete this feature... Greetings, Stephan On Sat, Sep 12, 2015 at 6:44 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: ok, I think I got the point: I don’t have two execute b

Re: intermediate result reuse

2015-09-12 Thread Michele Bertoni
I suppose, you call execute() between defining b and c. If you execute that call, a will be computed once and both b and c are computed at the same time. Best, Fabian 2015-09-12 11:02 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: Hi everybody, I have a question abou

Re: verbose console

2015-09-12 Thread Michele Bertoni
Hope that helps... Stephan On Fri, Sep 11, 2015 at 11:17 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi! Ok i solved it partially but is there a way to remove also all the lines referring to operators executed? i.e. Filter(Filter at package.class)(x/y) switc

intermediate result reuse

2015-09-12 Thread Michele Bertoni
Hi everybody, I have a question about internal optimization is flink able to reuse intermediate result that are used twice in the graph? i.e. a = readsource -> filter -> reduce -> something else even more complicated b = a filter(something) store b c = a filter(something else) store c what hap

Re: verbose console

2015-09-11 Thread Michele Bertoni
Hi! Ok i solved it partially but is there a way to remove also all the lines referring to operators executed? i.e. Filter(Filter at package.class)(x/y) switched to finish I don't really want them in my console output Thanks! Il giorno 02/set/2015, alle ore 16:49, Michele Be

Re: output writer

2015-09-09 Thread Michele Bertoni
you could use it directly to partition your output files on key1. 2015-09-08 18:44 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: yes you understood it right! but then, after that block, how can I partition data according to key1 (the output key) and save the ord

Re: output writer

2015-09-08 Thread Michele Bertoni
on does not change the values of key1, key2, and key3). However, be aware, that Flink might change the order of key1 and key2 (only grouping is required) and your data is partitioned on key1 AND key2, i.e., identical key2 values might be spread over all partitions. 2015-09-08 18:18 GMT+02:00 Mich

Re: output writer

2015-09-08 Thread Michele Bertoni
lelism, i.e., also that much LRUs spread over all machines. The number of LRU's per machine depends on the number of slots in your TaskManager configuration. 2015-09-08 17:39 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: Thanks! your answer is really helpful actually

Re: output writer

2015-09-08 Thread Michele Bertoni
stance of a task finished. Let me know, if you need more information or if something is not clear. Cheers, Fabian [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 2015-09-08 12:33 GMT+02:00 Michele Bertoni mailto:mi

Re: output writer

2015-09-08 Thread Michele Bertoni
sseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN -Original Message- From: Michele Bertoni [mailto:michele1.ber

Re: verbose console

2015-09-02 Thread Michele Bertoni
} %-5p %-60c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console Cheers, Till ​ On Wed, Sep 2, 2015 at 4:33 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote

Re: verbose console

2015-09-02 Thread Michele Bertoni
erties Your IDE should have an option to adjust VM arguments. Cheers, Max On Wed, Sep 2, 2015 at 9:10 AM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi everybody, I just found that in version 0.9.1 it is possibile to disable that verbose console, can you please expla

verbose console

2015-09-02 Thread Michele Bertoni
Hi everybody, I just found that in version 0.9.1 it is possibile to disable that verbose console, can you please explain how to do it both in IDE and local environment? Especially in IDE I am able to set property of log4j for my logger, but everything I try for flink internal one does not work

Re: problem with union

2015-08-27 Thread Michele Bertoni
: Hi Michele, Thanks for reporting the problem. It seems like we changed the way we compare generic types like your GValue type. I'm debugging that now. We can get a fix in for the 0.9.1 release. Cheers, Max On Tue, Jul 14, 2015 at 5:35 PM, Michele Bertoni mailto:michele1.bert...@mail.

output writer

2015-07-30 Thread Michele Bertoni
Hi everybody, I have a question about the writer I have to save my dataset in different files according to a field of the tuples let’s assume I have a groupId in the tuple, I need to store each group in a different file, with a custom name: any idea on how i can do that? thanks! Michele

Re: Yarn configuration

2015-07-27 Thread Michele Bertoni
r managed memory: 7680*0.7 = 5376 MB. The safety margin for YARN is very conservative. As Till already said, you can set a different value for the "yarn.heap-cutoff-ratio" (try 0.15) and see if your job still runs. On Mon, Jul 27, 2015 at 11:29 AM, Michele Bertoni mailto:michele1

Re: Yarn configuration

2015-07-27 Thread Michele Bertoni
rpose of the warning is to tell the user, that the memory configuration might not be optimal. However, this depends of course on the setup environment and should probably be rephrased to make this more clear. Cheers, Fabian 2015-07-27 11:07 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mai

Re: Yarn configuration

2015-07-27 Thread Michele Bertoni
Il giorno 27/lug/2015, alle ore 11:02, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> ha scritto: Hi Robert, thanks for answering, today I have been able to try again: no in an EMR configuration with 1 master and 5 core I have 5 active node in the resource manager…sounds strange

Re: Yarn configuration

2015-07-27 Thread Michele Bertoni
detects that specified containers will not fit in the cluster. Also, in the ResourceManager UI, you can see the status of the scheduler. This often helps to understand what's going on, resource-wise. On Fri, Jul 24, 2015 at 3:58 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>&

Yarn configuration

2015-07-24 Thread Michele Bertoni
Hi everybody, i need a help on how to configure a yarn cluster I tried a lot of conf but none of them was correct We have a cluster on amazon emr let's say 1manager+5worker all of them are m3.2xlarge then 8 core each and 30 GB of RAM each What is a good configuration for such cluster? I would l

sorted cogroup

2015-07-20 Thread Michele Bertoni
Hi everybody, i need to execute a cogroup on sorted groups. I explain it better: I have two datasets i.e. (key, value), I want to cogroup on key and then the have both iterator sorted by value how can i get it? I know iterator should be collected to be sorted but i want to avoid it. what happens

Re: Flink Scala performance

2015-07-18 Thread Michele Bertoni
hi, actually the same happens to me on my macbook pro when not plugged to power but with battery and twice if i am using hdfs in my case it seems like in power saving mode jvm commands has a very high latency i.e. a simple "hdfs dfs -ls /“ takes about 20 seconds when only on battery, so it is

Re: How can handles Exist ,not Exist query on flink

2015-07-16 Thread Michele Bertoni
ds1.filter(//here selection of query 1) ds2.filter(//here selection of query 2) exist ds1.join(ds2.distinct(id)).where(id).equal(id){ // join by your join key(s) - note the distinct operator, otherwise you will get many line for each input line (left, right) => left //collect left } or ds1.cogr

Re: open multiple file from list of uri

2015-07-15 Thread Michele Bertoni
critto: If you want to work without the placeholder, simply do: "env.createInput(new myDelimitedInputFormat(parser)(paths)) The "createInputSplits()" method looks good. Greetings, Stephan On Tue, Jul 14, 2015 at 11:42 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it&

Re: open multiple file from list of uri

2015-07-14 Thread Michele Bertoni
variable), and override the "createInputSplits()" method. Stephan On Tue, Jul 14, 2015 at 6:42 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi Stephan, I started working on this today, but I am having a problem Can you be a little more detailed in the pr

Re: open multiple file from list of uri

2015-07-14 Thread Michele Bertoni
takes a path from that variable Thanks, Michele Il giorno 26/giu/2015, alle ore 12:28, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> ha scritto: Right! later I will do the question and quoting your answer with the solution :) Il giorno 26/giu/2015, alle ore 12:27, Stepha

problem with union

2015-07-14 Thread Michele Bertoni
Hi everybody, this discussion started in an other thread about a problem in union, but you said it was a different error then i am opening a new topic I am doing the union of two dataset and I am getting this error Exception in thread "main" org.apache.flink.api.common.InvalidProgramException

Re: local web-client error

2015-07-13 Thread Michele Bertoni
:00, Maximilian Michels mailto:m...@apache.org>> ha scritto: Hi Michele, Sorry to hear you are experiencing problems with the web client. Which version of Flink are you using? Could you paste the whole error message you see? Thank you. Best regards, Max On Sun, Jul 12, 2015 at 11:21 AM, M

local web-client error

2015-07-12 Thread Michele Bertoni
I think there is a problem with the web-client Quite often I can use it for a single run and then it crash especially if after seeing the graph i click back, on the second run i get a class not found exception from terminal i have to stop and restart it and it works again Michele

Re: why when use orders.aggregate(Aggregations.MAX, 2) not return one value but return more value

2015-07-08 Thread Michele Bertoni
hi, you are not printing the aggregation but the input val result = orders.aggregate(Aggregations.MAX, 2) result.print cheers michele > Il giorno 08/lug/2015, alle ore 02:00, hagersaleh > ha scritto: > > why when use orders.aggregate(Aggregations.MAX, 2) not return one value but > retu

Re: Execution graph

2015-06-30 Thread Michele Bertoni
the visualization. Best, Max On Tue, Jun 30, 2015 at 7:11 AM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi, I was trying to run my program in the flink web environment (the local one) when I run it I get the graph of the planned execution but in each node there

Re: Datasets union CompilerException

2015-06-30 Thread Michele Bertoni
Hi yesterday on the union I faced an other problem: at runtime it was saying something like “Union cannot work with dataset of two different types” then it was showing the types and they were exactly the same (Tuple5 I solved it changing on field of the tuple from a custom object (MyClass) that

Execution graph

2015-06-29 Thread Michele Bertoni
Hi, I was trying to run my program in the flink web environment (the local one) when I run it I get the graph of the planned execution but in each node there is a "parallelism = 1”, instead i think it runs with par = 8 (8 core, i always get 8 output) what does that mean? is that wrong or is it

Re: cogroup

2015-06-29 Thread Michele Bertoni
joins for Flink, but I don't know what the progress is. Best, Fabian 2015-06-29 18:05 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: thanks both for answering, that’s what i expected I was using join at first but sadly i had to move from join to cogroup beca

Re: cogroup

2015-06-29 Thread Michele Bertoni
ere is no other way than collecting one side in memory. Best, Fabian 2015-06-29 17:42 GMT+02:00 Matthias J. Sax mailto:mj...@informatik.hu-berlin.de>>: Why do you not use a join? CoGroup seems not to be the right operator. -Matthias On 06/29/2015 05:40 PM, Michele Bertoni wrote: &

cogroup

2015-06-29 Thread Michele Bertoni
Hi I have a question on cogroup when I cogroup two dataset is there a way to compare each element on the left with each element on the right (inside a group) without collecting one side? right now I am doing left.cogroup(right).where(0,1,2).equalTo(0,1,2){ (leftIterator, rightIterator,

Re: open multiple file from list of uri

2015-06-26 Thread Michele Bertoni
... On Fri, Jun 26, 2015 at 12:25 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Got it! i will try thanks! :) What about writing a section of it in the programming guide? I found a couple of topic about the readers in the mailing list, it seems it may be helpful

Re: using value that are not passed as parameter

2015-06-26 Thread Michele Bertoni
/FLINK/Variables+Closures+vs.+Broadcast+Variables Greetings, Stephan On Wed, Jun 24, 2015 at 6:23 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi everybody, this question may sounds stupid, but i would like to have it clear what happens if inside a dataset transforma

Re: open multiple file from list of uri

2015-06-26 Thread Michele Bertoni
erride the "createInputSplits()" method. Call for each of your file paths "super.createInputSplits()" and then combine the results into one array that you return. That should do it... On Fri, Jun 26, 2015 at 12:19 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it&g

Re: open multiple file from list of uri

2015-06-26 Thread Michele Bertoni
Create multiple sources and union them. This is easy, but probably a bit less efficient. 2) Override the FileInputFormat's createInputSplits method to take a union of the paths to create a list of all files and fils splits that will be read. Stephan On Fri, Jun 26, 2015 at 12:12 PM, Mich

open multiple file from list of uri

2015-06-26 Thread Michele Bertoni
Hi everybody, is there a way to specify a list of URI (“hdfs://file1”,”hdfs://file2”,…) and open them as different files? I know i may open the entire directory, but i want to be able to select a subset of files in the directory thanks

using value that are not passed as parameter

2015-06-24 Thread Michele Bertoni
Hi everybody, this question may sounds stupid, but i would like to have it clear what happens if inside a dataset transformation (e.g. a map) I use something that is declared somewhere else, like a variable or a dataset, and not passed as broadcast dataset nor parameter in the constructor of a r

Re: sorting groups

2015-06-17 Thread Michele Bertoni
the documentation michele Il giorno 17/giu/2015, alle ore 08:35, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> ha scritto: Hi Fabian, My dataset is of this type RegionType (Long, String, Long, Long, Char, Array[GValue]) Where GValue is a case class implemented by GString(v:

Re: sorting groups

2015-06-16 Thread Michele Bertoni
and and the input type, I can help you getting it right. Cheers, Fabian 2015-06-16 23:37 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: Hi everybody, I am trying to sorting a grouped dataset, but i am getting this error: Exception in thread "main" org.ap

sorting groups

2015-06-16 Thread Michele Bertoni
Hi everybody, I am trying to sorting a grouped dataset, but i am getting this error: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Sorting on KeySelector keys only works with KeySelector grouping. at org.apache.flink.api.scala.GroupedDataSet.sortGroup(Gr

Re: problem on yarn cluster

2015-05-20 Thread Michele Bertoni
local environment, because there everything is running in one JVM and the needed class is available there. In the distributed case, we have a special usercode classloader which can load classes from the user's jar. On Mon, May 18, 2015 at 9:16 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi

Re: problem on yarn cluster

2015-05-18 Thread Michele Bertoni
hat job. Greetings, Stephan On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi, I have a problem running my app on a Yarn cluster I developed it in my computer and everything is working fine then we setup the environment on Amazon EMR

Re: problem on yarn cluster

2015-05-18 Thread Michele Bertoni
cated YARN session for that job. Greetings, Stephan On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi, I have a problem running my app on a Yarn cluster I developed it in my computer and everything is working fine then we setup the enviro

problem on yarn cluster

2015-05-18 Thread Michele Bertoni
Hi, I have a problem running my app on a Yarn cluster I developed it in my computer and everything is working fine then we setup the environment on Amazon EMR reading data from HDFS not S3 we run it with these command ./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096 ./flink run -m yarn-cluste

Re: inconsistency in count and print

2015-05-16 Thread Michele Bertoni
, alle ore 12:20, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> ha scritto: The first time I hash my data is in the reading phase: each line is added of one field that is the hash of its file name, I do this with a custom reader that extends the DelimitedInputFormat and override th

Re: inconsistency in count and print

2015-05-16 Thread Michele Bertoni
anything seems to work Il giorno 16/mag/2015, alle ore 12:00, Fabian Hueske mailto:fhue...@gmail.com>> ha scritto: Invalid hash values can certainly cause non-deterministic results. Can you provide a code snippet that shows how and where you used the Guava Hasher? 2015-05-16 11:52 GM

Re: inconsistency in count and print

2015-05-16 Thread Michele Bertoni
removed it anywhere and I started using the java hashcode, now it is seems to work > Il giorno 16/mag/2015, alle ore 09:15, Michele Bertoni > ha scritto: > > Hi, > it is 2 days i am going mad with a problem, every time i run the code (on the > same dataset) i get a different r

inconsistency in count and print

2015-05-16 Thread Michele Bertoni
Hi, it is 2 days i am going mad with a problem, every time i run the code (on the same dataset) i get a different result while i was trying debugging i found this i have this code val aggregationResult = //something that creates the dataset and uses join, group, reduce and map logger.error("r

Re: Memory exception

2015-05-15 Thread Michele Bertoni
memory size. Try adding "-Xmx500m" or so to your "lauch" options in the IDE. On Sun, May 10, 2015 at 9:19 PM, Michele Bertoni mailto:michele1.bert...@mail.polimi.it>> wrote: Hi, thank you for your fast answer next time it happens i will try Michele Il giorno 10/ma

Re: Memory exception

2015-05-10 Thread Michele Bertoni
arameter [1]. Cheers, Fabian [1] http://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#jobmanager-amp-taskmanager 2015-05-10 19:18 GMT+02:00 Michele Bertoni mailto:michele1.bert...@mail.polimi.it>>: Hi everybody, I am running a flink instance on my IDE sometimes (totall

Memory exception

2015-05-10 Thread Michele Bertoni
Hi everybody, I am running a flink instance on my IDE sometimes (totally random) I start it i getting this error Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithL