Thank you. I'll check this On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> Broadcasted DataSets are stored on the JVM heap of each task manager (but > shared among multiple slots on the same TM), hence the size restriction. > > There are two ways to retrieve a DataSet (such as the result of a reduce). > 1) if you want to fetch the result into your client program use > DataSet.collect(). This immediately triggers an execution and fetches the > result from the cluster. > 2) if you want to use the result for a computation in the cluster use > broadcast sets as described above. > > 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > >> Thank you, yes, this makes sense. The broadcasted data in my case would a >> large array of 3D coordinates, >> >> On a side note, how can I take the output from a reduce function? I can >> see methods to write it to a given output, but is it possible to retrieve >> the reduced result back to the program - like a double value representing >> the average in the previous example. >> >> >> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> You can use so-called BroadcastSets to send any sufficiently small >>> DataSet (such as a computed average) to any other function and use it there. >>> However, in your case you'll end up with a data flow that branches (at >>> the source) and merges again (when the average is send to the second map). >>> Such patterns can cause deadlocks and can therefore not be pipelined >>> which means that the data before the branch is written to disk and read >>> again. >>> In your case it might be even better to read the data twice instead of >>> reading, writing, and reading it. >>> >>> Fabian >>> >>> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>> >>>> I looked at the samples and I think what you meant is clear, but I >>>> didn't find a solution for my need. In my case, I want to use the result >>>> from first map operation before I can apply the second map on the >>>> *same* data set. For simplicity, let's say I've a bunch of short >>>> values represented as my data set. Then I need to find their average, so I >>>> use a map and reduce. Then I want to map these short values with another >>>> function, but it needs that average computed in the beginning to work >>>> correctly. >>>> >>>> Is this possible without doing multiple reads of the input data to >>>> create the same dataset? >>>> >>>> Thank you, >>>> saliya >>>> >>>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Yes, if you implement both maps in a single job, data is read once. >>>>> >>>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>> >>>>>> Fabian, >>>>>> >>>>>> I've a quick follow-up question on what you suggested. When streaming >>>>>> the same data through different maps, were you implying that everything >>>>>> goes as single job in Flink, so data read happens only once? >>>>>> >>>>>> Thanks, >>>>>> Saliya >>>>>> >>>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> It is not possible to "pin" data sets in memory, yet. >>>>>>> However, you can stream the same data set through two different >>>>>>> mappers at the same time. >>>>>>> >>>>>>> For instance you can have a job like: >>>>>>> >>>>>>> /---> Map 1 --> SInk1 >>>>>>> Source --< >>>>>>> \---> Map 2 --> SInk2 >>>>>>> >>>>>>> and execute it at once. >>>>>>> For that you define you data flow and call execute once after all >>>>>>> sinks have been created. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>> >>>>>>>> Fabian, >>>>>>>> >>>>>>>> count() was just an example. What I would like to do is say run two >>>>>>>> map operations on the dataset (ds). Each map will have it's own >>>>>>>> reduction, >>>>>>>> so is there a way to avoid creating two jobs for such scenario? >>>>>>>> >>>>>>>> The reason is, reading these binary matrices are expensive. In our >>>>>>>> current MPI implementation, I am using memory maps for faster loading >>>>>>>> and >>>>>>>> reuse. >>>>>>>> >>>>>>>> Thank you, >>>>>>>> Saliya >>>>>>>> >>>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> it looks like you are executing two distinct Flink jobs. >>>>>>>>> DataSet.count() triggers the execution of a new job. If you have >>>>>>>>> an execute() call in your program, this will lead to two Flink jobs >>>>>>>>> being >>>>>>>>> executed. >>>>>>>>> It is not possible to share state among these jobs. >>>>>>>>> >>>>>>>>> Maybe you should add a custom count implementation (using a >>>>>>>>> ReduceFunction) which is executed in the same program as the other >>>>>>>>> ReduceFunction. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>>>>>> called for each terminal operation on a given dataset using that >>>>>>>>>> particular >>>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some >>>>>>>>>> caching >>>>>>>>>> technique in Flink? >>>>>>>>>> >>>>>>>>>> For example, I've some code like below and I see for both the >>>>>>>>>> last two statements (reduce() and count()) the above methods in the >>>>>>>>>> input >>>>>>>>>> format get called. Btw. this is a custom input format I wrote to >>>>>>>>>> represent >>>>>>>>>> a binary matrix stored as Short values. >>>>>>>>>> >>>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>>>>>> >>>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, >>>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>>>>>> >>>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>>>>>>>>> >>>>>>>>>> *op.reduce(...)* >>>>>>>>>> >>>>>>>>>> *op.count(...)* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you, >>>>>>>>>> Saliya >>>>>>>>>> -- >>>>>>>>>> Saliya Ekanayake >>>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>>> Indiana University, Bloomington >>>>>>>>>> Cell 812-391-4914 >>>>>>>>>> http://saliya.org >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Saliya Ekanayake >>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>> Indiana University, Bloomington >>>>>>>> Cell 812-391-4914 >>>>>>>> http://saliya.org >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org