It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at
the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks
have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:

> Fabian,
>
> count() was just an example. What I would like to do is say run two map
> operations on the dataset (ds). Each map will have it's own reduction, so
> is there a way to avoid creating two jobs for such scenario?
>
> The reason is, reading these binary matrices are expensive. In our current
> MPI implementation, I am using memory maps for faster loading and reuse.
>
> Thank you,
> Saliya
>
> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> it looks like you are executing two distinct Flink jobs.
>> DataSet.count() triggers the execution of a new job. If you have an
>> execute() call in your program, this will lead to two Flink jobs being
>> executed.
>> It is not possible to share state among these jobs.
>>
>> Maybe you should add a custom count implementation (using a
>> ReduceFunction) which is executed in the same program as the other
>> ReduceFunction.
>>
>> Best, Fabian
>>
>>
>>
>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>
>>> Hi,
>>>
>>> I see that an InputFormat's open() and nextRecord() methods get called
>>> for each terminal operation on a given dataset using that particular
>>> InputFormat. Is it possible to avoid this - possibly using some caching
>>> technique in Flink?
>>>
>>> For example, I've some code like below and I see for both the last two
>>> statements (reduce() and count()) the above methods in the input format get
>>> called. Btw. this is a custom input format I wrote to represent a binary
>>> matrix stored as Short values.
>>>
>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>
>>> DataSet<Short[]> ds = env.createInput(smif, 
>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>
>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>
>>> *op.reduce(...)*
>>>
>>> *op.count(...)*
>>>
>>>
>>> Thank you,
>>> Saliya
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Reply via email to