I would have a look at the example programs in our code base: https://github.com/apache/flink/tree/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java
Best, Fabian 2016-02-15 22:03 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > Thank you, Fabian. > > Any chance you might have an example on how to define a data flow with > Flink? > > > > On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> It is not possible to "pin" data sets in memory, yet. >> However, you can stream the same data set through two different mappers >> at the same time. >> >> For instance you can have a job like: >> >> /---> Map 1 --> SInk1 >> Source --< >> \---> Map 2 --> SInk2 >> >> and execute it at once. >> For that you define you data flow and call execute once after all sinks >> have been created. >> >> Best, Fabian >> >> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >> >>> Fabian, >>> >>> count() was just an example. What I would like to do is say run two map >>> operations on the dataset (ds). Each map will have it's own reduction, so >>> is there a way to avoid creating two jobs for such scenario? >>> >>> The reason is, reading these binary matrices are expensive. In our >>> current MPI implementation, I am using memory maps for faster loading and >>> reuse. >>> >>> Thank you, >>> Saliya >>> >>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> it looks like you are executing two distinct Flink jobs. >>>> DataSet.count() triggers the execution of a new job. If you have an >>>> execute() call in your program, this will lead to two Flink jobs being >>>> executed. >>>> It is not possible to share state among these jobs. >>>> >>>> Maybe you should add a custom count implementation (using a >>>> ReduceFunction) which is executed in the same program as the other >>>> ReduceFunction. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I see that an InputFormat's open() and nextRecord() methods get called >>>>> for each terminal operation on a given dataset using that particular >>>>> InputFormat. Is it possible to avoid this - possibly using some caching >>>>> technique in Flink? >>>>> >>>>> For example, I've some code like below and I see for both the last two >>>>> statements (reduce() and count()) the above methods in the input format >>>>> get >>>>> called. Btw. this is a custom input format I wrote to represent a binary >>>>> matrix stored as Short values. >>>>> >>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>> >>>>> DataSet<Short[]> ds = env.createInput(smif, >>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>> >>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>>>> >>>>> *op.reduce(...)* >>>>> >>>>> *op.count(...)* >>>>> >>>>> >>>>> Thank you, >>>>> Saliya >>>>> -- >>>>> Saliya Ekanayake >>>>> Ph.D. Candidate | Research Assistant >>>>> School of Informatics and Computing | Digital Science Center >>>>> Indiana University, Bloomington >>>>> Cell 812-391-4914 >>>>> http://saliya.org >>>>> >>>> >>>> >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >