It is not possible to "pin" data sets in memory, yet. However, you can stream the same data set through two different mappers at the same time.
For instance you can have a job like: /---> Map 1 --> SInk1 Source --< \---> Map 2 --> SInk2 and execute it at once. For that you define you data flow and call execute once after all sinks have been created. Best, Fabian 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > Fabian, > > count() was just an example. What I would like to do is say run two map > operations on the dataset (ds). Each map will have it's own reduction, so > is there a way to avoid creating two jobs for such scenario? > > The reason is, reading these binary matrices are expensive. In our current > MPI implementation, I am using memory maps for faster loading and reuse. > > Thank you, > Saliya > > On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> it looks like you are executing two distinct Flink jobs. >> DataSet.count() triggers the execution of a new job. If you have an >> execute() call in your program, this will lead to two Flink jobs being >> executed. >> It is not possible to share state among these jobs. >> >> Maybe you should add a custom count implementation (using a >> ReduceFunction) which is executed in the same program as the other >> ReduceFunction. >> >> Best, Fabian >> >> >> >> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >> >>> Hi, >>> >>> I see that an InputFormat's open() and nextRecord() methods get called >>> for each terminal operation on a given dataset using that particular >>> InputFormat. Is it possible to avoid this - possibly using some caching >>> technique in Flink? >>> >>> For example, I've some code like below and I see for both the last two >>> statements (reduce() and count()) the above methods in the input format get >>> called. Btw. this is a custom input format I wrote to represent a binary >>> matrix stored as Short values. >>> >>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>> >>> DataSet<Short[]> ds = env.createInput(smif, >>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>> >>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>> >>> *op.reduce(...)* >>> >>> *op.count(...)* >>> >>> >>> Thank you, >>> Saliya >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >