Thanks, I'll check this. Saliya
On Mon, Feb 15, 2016 at 4:08 PM, Fabian Hueske <fhue...@gmail.com> wrote: > I would have a look at the example programs in our code base: > > > https://github.com/apache/flink/tree/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java > > Best, Fabian > > 2016-02-15 22:03 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > >> Thank you, Fabian. >> >> Any chance you might have an example on how to define a data flow with >> Flink? >> >> >> >> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> It is not possible to "pin" data sets in memory, yet. >>> However, you can stream the same data set through two different mappers >>> at the same time. >>> >>> For instance you can have a job like: >>> >>> /---> Map 1 --> SInk1 >>> Source --< >>> \---> Map 2 --> SInk2 >>> >>> and execute it at once. >>> For that you define you data flow and call execute once after all sinks >>> have been created. >>> >>> Best, Fabian >>> >>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>> >>>> Fabian, >>>> >>>> count() was just an example. What I would like to do is say run two map >>>> operations on the dataset (ds). Each map will have it's own reduction, so >>>> is there a way to avoid creating two jobs for such scenario? >>>> >>>> The reason is, reading these binary matrices are expensive. In our >>>> current MPI implementation, I am using memory maps for faster loading and >>>> reuse. >>>> >>>> Thank you, >>>> Saliya >>>> >>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> it looks like you are executing two distinct Flink jobs. >>>>> DataSet.count() triggers the execution of a new job. If you have an >>>>> execute() call in your program, this will lead to two Flink jobs being >>>>> executed. >>>>> It is not possible to share state among these jobs. >>>>> >>>>> Maybe you should add a custom count implementation (using a >>>>> ReduceFunction) which is executed in the same program as the other >>>>> ReduceFunction. >>>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>> called for each terminal operation on a given dataset using that >>>>>> particular >>>>>> InputFormat. Is it possible to avoid this - possibly using some caching >>>>>> technique in Flink? >>>>>> >>>>>> For example, I've some code like below and I see for both the last >>>>>> two statements (reduce() and count()) the above methods in the input >>>>>> format >>>>>> get called. Btw. this is a custom input format I wrote to represent a >>>>>> binary matrix stored as Short values. >>>>>> >>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>> >>>>>> DataSet<Short[]> ds = env.createInput(smif, >>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>> >>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>>>>> >>>>>> *op.reduce(...)* >>>>>> >>>>>> *op.count(...)* >>>>>> >>>>>> >>>>>> Thank you, >>>>>> Saliya >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org