Hi Ravikumar, In short: No, you can't use closures to maintain a global state. If you want to keep an always global state, you'll have to use parallelism 1 or an external data store to keep that global state.
Is it possible to break up your global state into a set of local states which can be combined in the end? That way, you can take advantage of distributed parallel processing. Cheers, Max On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar <ravikumar.hawal...@gmail.com> wrote: > Hi Fabian, Thank you for your help. > > I want my Flink application to be distributed as well as I want the facility > to support the update of the variable [Coefficients of LinearRegression]. > > How you would do in that case? > > The problem with iteration is that it expects Dataset with same type to be > fed back, and my variable is just a double[]. Otherwise I have to map every > record with a double[] wrapped inside a tuple2 and then try out iterations > but I am sure this won't work as well. > > Can I use closure or lambdas to maintain global state? > > > Regards, > Ravikumar > > On 9 June 2016 at 20:17, Fabian Hueske <fhue...@gmail.com> wrote: >> >> Hi, >> >> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it >> is only executed on a single node. It depends on your application, if you >> need a global state or whether multiple local states are OK. >> 2) Flink programs follow the concept a data flow. There is no >> communication between parallel instances of a task, i.e., all four tasks of >> a MapOperator with parallelism 4 cannot talk to each other. You might want >> to take a look at Flink's iteration operators. With these you can feed data >> back into a previous operator [1]. >> 4) Yes, that should work. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html >> >> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar >> <ravikumar.hawal...@gmail.com>: >>> >>> Hi Fabian, Thank you for your answers, >>> >>> 1) If there is only single instance of that function, then it will defeat >>> the purpose of distributed correct me if I am wrong, so If I run parallelism >>> with 1 on cluster does that mean it will execute on only one node? >>> >>> 2) I mean to say, when a map operator returns a variable, is there any >>> other function which takes that updated variable and returns that to all >>> instances of map? >>> >>> 3) Question Cleared. >>> >>> 4) My question was can I use same ExecutionEnvironment for all flink >>> programs in a module. >>> >>> 5) Question Cleared. >>> >>> >>> Regards >>> Ravikumar >>> >>> >>> >>> On 9 June 2016 at 17:58, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>> Hi Ravikumar, >>>> >>>> I'll try to answer your questions: >>>> 1) If you set the parallelism of a map function to 1, there will be only >>>> a single instance of that function regardless whether it is execution >>>> locally or remotely in a cluster. >>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine, >>>> ...). However, I do not see how this would help with a stateful map >>>> function. >>>> 3) In Flink DataSet programs you usually construct the complete program >>>> and call execute() after you have defined your sinks. There are two >>>> exceptions: print() and collect() which both add special sinks and >>>> immediately execute your program. print() prints the result to the stdout >>>> of >>>> the submitting client and collect() fetches a dataset as collection. >>>> 4) I am not sure I understood your question. When you obtain an >>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() >>>> the >>>> type of the returned environment depends on the context in which the >>>> program >>>> was executed. It can be a local environment if it is executed from within >>>> an >>>> IDE or a RemodeExecutionEnvironment if the program is executed via the CLI >>>> client and shipped to a remote cluster. >>>> 5) A map operator processes records one after the other, i.e., as a >>>> sequence. If you need a certain order, you can call DataSet.sortPartition() >>>> to locally sort the partition. >>>> >>>> Hope that helps, >>>> Fabian >>>> >>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar >>>> <ravikumar.hawal...@gmail.com>: >>>>> >>>>> Hi Till, Thank you for your answer, I have couple of questions >>>>> >>>>> 1) Setting parallelism on a single map function in local is fine but on >>>>> distributed will it work as local execution? >>>>> >>>>> 2) Is there any other way apart from setting parallelism? Like spark >>>>> aggregate function? >>>>> >>>>> 3) Is it necessary that after transformations to call execute function? >>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)? >>>>> >>>>> 4) Can I create a global execution environment (Either local or >>>>> distributed) for different Flink program in a module? >>>>> >>>>> 5) How to make the records come in sequence for a map or any other >>>>> operator? >>>>> >>>>> >>>>> Regards, >>>>> Ravikumar >>>>> >>>>> >>>>> On 8 June 2016 at 21:14, Till Rohrmann <trohrm...@apache.org> wrote: >>>>>> >>>>>> Hi Ravikumar, >>>>>> >>>>>> Flink's operators are stateful. So you can simply create a variable in >>>>>> your mapper to keep the state around. But every mapper instance will have >>>>>> it's own state. This state is determined by the records which are sent to >>>>>> this mapper instance. If you need a global state, then you have to set >>>>>> the >>>>>> parallelism to 1. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar >>>>>> <ravikumar.hawal...@gmail.com> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have an DataSet<UserDefinedType> which is roughly a record in a >>>>>>> DataSet Or a file. >>>>>>> >>>>>>> Now I am using map transformation on this DataSet to compute a >>>>>>> variable (coefficients of linear regression parameters and data >>>>>>> structure >>>>>>> used is a double[]). >>>>>>> >>>>>>> Now the issue is that, per record the variable will get updated and I >>>>>>> am struggling to maintain state of this variable for the next record. >>>>>>> >>>>>>> In simple, for first record the variable values will be 0.0, and >>>>>>> after first record the variable will get updated and I have to pass this >>>>>>> updated variable for the second record and so on for all records in >>>>>>> DataSet. >>>>>>> >>>>>>> Any suggestions on how to maintain state of a variable? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Ravikumar >>>>>> >>>>>> >>>>> >>>> >>> >> >