Thank you, yes, this makes sense. The broadcasted data in my case would a
large array of 3D coordinates,

On a side note, how can I take the output from a reduce function? I can see
methods to write it to a given output, but is it possible to retrieve the
reduced result back to the program - like a double value representing the
average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> You can use so-called BroadcastSets to send any sufficiently small DataSet
> (such as a computed average) to any other function and use it there.
> However, in your case you'll end up with a data flow that branches (at the
> source) and merges again (when the average is send to the second map).
> Such patterns can cause deadlocks and can therefore not be pipelined which
> means that the data before the branch is written to disk and read again.
> In your case it might be even better to read the data twice instead of
> reading, writing, and reading it.
>
> Fabian
>
> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>
>> I looked at the samples and I think what you meant is clear, but I didn't
>> find a solution for my need. In my case, I want to use the result from
>> first map operation before I can apply the second map on the *same* data
>> set. For simplicity, let's say I've a bunch of short values represented as
>> my data set. Then I need to find their average, so I use a map and reduce.
>> Then I want to map these short values with another function, but it needs
>> that average computed in the beginning to work correctly.
>>
>> Is this possible without doing multiple reads of the input data to create
>> the same dataset?
>>
>> Thank you,
>> saliya
>>
>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Yes, if you implement both maps in a single job, data is read once.
>>>
>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>
>>>> Fabian,
>>>>
>>>> I've a quick follow-up question on what you suggested. When streaming
>>>> the same data through different maps, were you implying that everything
>>>> goes as single job in Flink, so data read happens only once?
>>>>
>>>> Thanks,
>>>> Saliya
>>>>
>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> It is not possible to "pin" data sets in memory, yet.
>>>>> However, you can stream the same data set through two different
>>>>> mappers at the same time.
>>>>>
>>>>> For instance you can have a job like:
>>>>>
>>>>>                  /---> Map 1 --> SInk1
>>>>> Source --<
>>>>>                  \---> Map 2 --> SInk2
>>>>>
>>>>> and execute it at once.
>>>>> For that you define you data flow and call execute once after all
>>>>> sinks have been created.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>
>>>>>> Fabian,
>>>>>>
>>>>>> count() was just an example. What I would like to do is say run two
>>>>>> map operations on the dataset (ds). Each map will have it's own 
>>>>>> reduction,
>>>>>> so is there a way to avoid creating two jobs for such scenario?
>>>>>>
>>>>>> The reason is, reading these binary matrices are expensive. In our
>>>>>> current MPI implementation, I am using memory maps for faster loading and
>>>>>> reuse.
>>>>>>
>>>>>> Thank you,
>>>>>> Saliya
>>>>>>
>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> it looks like you are executing two distinct Flink jobs.
>>>>>>> DataSet.count() triggers the execution of a new job. If you have an
>>>>>>> execute() call in your program, this will lead to two Flink jobs being
>>>>>>> executed.
>>>>>>> It is not possible to share state among these jobs.
>>>>>>>
>>>>>>> Maybe you should add a custom count implementation (using a
>>>>>>> ReduceFunction) which is executed in the same program as the other
>>>>>>> ReduceFunction.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get
>>>>>>>> called for each terminal operation on a given dataset using that 
>>>>>>>> particular
>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some caching
>>>>>>>> technique in Flink?
>>>>>>>>
>>>>>>>> For example, I've some code like below and I see for both the last
>>>>>>>> two statements (reduce() and count()) the above methods in the input 
>>>>>>>> format
>>>>>>>> get called. Btw. this is a custom input format I wrote to represent a
>>>>>>>> binary matrix stored as Short values.
>>>>>>>>
>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>>>>>
>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, 
>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>>>>>
>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>>>>>
>>>>>>>> *op.reduce(...)*
>>>>>>>>
>>>>>>>> *op.count(...)*
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Saliya
>>>>>>>> --
>>>>>>>> Saliya Ekanayake
>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>> Indiana University, Bloomington
>>>>>>>> Cell 812-391-4914
>>>>>>>> http://saliya.org
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Saliya Ekanayake
>>>>>> Ph.D. Candidate | Research Assistant
>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>> Indiana University, Bloomington
>>>>>> Cell 812-391-4914
>>>>>> http://saliya.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Saliya Ekanayake
>>>> Ph.D. Candidate | Research Assistant
>>>> School of Informatics and Computing | Digital Science Center
>>>> Indiana University, Bloomington
>>>> Cell 812-391-4914
>>>> http://saliya.org
>>>>
>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org

Reply via email to