You have to persist it and read from it for the subsequent operations. Take a look at the FlinkMLTools.persist methods.
Cheers, Till On Thu, Apr 28, 2016 at 6:14 PM, Sergio Ramírez <srami...@correo.ugr.es> wrote: > Hello, > > OK, now I understand everything. So if I want to re-use my DataSet in > several different operations, what should I do? Is there any way to > maintain the save the data from re-computation? I am not only talking about > iteration. I mean re-use of data. > > For example, imagine I want filter some results and collect, and then I > want to apply another filter. > > Regards > > > On 26/04/16 14:25, Till Rohrmann wrote: > >> Hi Sergio, >> >> sorry for the late reply. I figured out your problem. The reason why you >> see apparently inconsistent results is that you execute your job multiple >> times. Each collect call triggers an eager execution of your Flink job. >> Since the intermediate results are not stored the whole topology has to be >> re-executed for every collect call. Since the input split assignment of >> your libSVM file happens lazily, it can happen that the different sub >> tasks >> get different splits of the input file assigned. Therefore, it happens >> that >> you see different lengths for different features of the same partition. >> >> If you replace the last 6 lines of your program with: >> >> transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 -> >> t._2.size).reduceGroup(_.mkString(",")).output(new >> PrintingOutputFormat()) >> val b = transposed.filter(_._1._1 == 10).map(t => t._1 -> >> t._2.size).reduceGroup(_.mkString(",")).output(new >> PrintingOutputFormat()) >> val c = transposed.filter(_._1._1 == 12).map(t => t._1 -> >> t._2.size).reduceGroup(_.mkString(",")).output(new >> PrintingOutputFormat()) >> >> env.execute() >> >> you should see the correct results. If you need a deterministic input >> split >> assignment to the different sources, then you would have to implement your >> own InputFormat which returns a special InputSplitAssigner which does the >> deterministic input split assignment. Or simply try to avoid collect, >> count >> and print which trigger the eager execution of a Flink job. >> >> Cheers, >> Till >> >> >> On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <srami...@correo.ugr.es> >> wrote: >> >> Hello again: >>> >>> Any news about this problem with enriched MapPartition function? >>> >>> Thank you >>> >>> >>> On 06/04/16 17:01, Sergio Ramírez wrote: >>> >>> Hello, >>>> >>>> Ok, please find enclosed the test code and the input data. >>>> >>>> Cheers >>>> >>>> On 31/03/16 10:07, Till Rohrmann wrote: >>>> >>>> Hi Sergio, >>>>> >>>>> could you please provide a complete example (including input data) to >>>>> reproduce your problem. It is hard to tell what's going wrong when one >>>>> only >>>>> sees a fraction of the program. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez < >>>>> srami...@correo.ugr.es> >>>>> wrote: >>>>> >>>>> Hi again, >>>>> >>>>>> I've not been able to solve the problem with the instruction you gave >>>>>> me. >>>>>> I've tried with static variables (matrices) also unsuccessfully. I've >>>>>> also >>>>>> tried this simpler code: >>>>>> >>>>>> >>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out: >>>>>> Collector[((Int, Int), Int)]): Unit = { >>>>>> val index = getRuntimeContext().getIndexOfThisSubtask() // >>>>>> Partition index >>>>>> var ninst = 0 >>>>>> for(reg <- it.asScala) { >>>>>> requireByteValues(reg.vector) >>>>>> ninst += 1 >>>>>> } >>>>>> for(i <- 0 until nFeatures) out.collect((i, index) -> >>>>>> ninst) >>>>>> } >>>>>> >>>>>> The result is as follows: >>>>>> >>>>>> Attribute 10, first seven partitions: >>>>>> >>>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201) >>>>>> >>>>>> Attribute 12, first seven partitions: >>>>>> >>>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201) >>>>>> >>>>>> >>>>>> As you can see, for example, for block 6 different number of instances >>>>>> are >>>>>> shown, but it's impossible. >>>>>> >>>>>> >>>>>> On 24/03/16 22:39, Chesnay Schepler wrote: >>>>>> >>>>>> Haven't looked to deeply into this, but this sounds like object reuse >>>>>> is >>>>>> >>>>>>> enabled, at which point buffering values effectively causes you to >>>>>>> store >>>>>>> the same value multiple times. >>>>>>> >>>>>>> can you try disabling objectReuse using >>>>>>> env.getConfig().disableObjectReuse() ? >>>>>>> >>>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote: >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>>> I've been having some problems with RichMapPartitionFunction. >>>>>>>> Firstly, I >>>>>>>> tried to convert the iterable into an array unsuccessfully. Then, I >>>>>>>> have >>>>>>>> used some buffers to store the values per column. I am trying to >>>>>>>> transpose >>>>>>>> the local matrix of LabeledVectors that I have in each partition. >>>>>>>> >>>>>>>> None of these solutions have worked. For example, for partition 7 >>>>>>>> and >>>>>>>> feature 10, the vector is empty, whereas for the same partition and >>>>>>>> feature >>>>>>>> 11, the vectors contains 200 elements. And this change on each >>>>>>>> execution, >>>>>>>> different partitions and features. >>>>>>>> >>>>>>>> I think there is a problem with using the collect method out of the >>>>>>>> iterable loop. >>>>>>>> >>>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), >>>>>>>> Array[Byte])]() >>>>>>>> { >>>>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], >>>>>>>> out: >>>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = { >>>>>>>> val index = getRuntimeContext().getIndexOfThisSubtask() >>>>>>>> val mat = for (i <- 0 until nFeatures) yield new >>>>>>>> scala.collection.mutable.ListBuffer[Byte] >>>>>>>> for(reg <- it.asScala) { >>>>>>>> for (i <- 0 until (nFeatures - 1)) mat(i) += >>>>>>>> reg.vector(i).toByte >>>>>>>> mat(nFeatures - 1) += classMap(reg.label) >>>>>>>> } >>>>>>>> for(i <- 0 until nFeatures) out.collect((i, index) -> >>>>>>>> mat(i).toArray) // numPartitions >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Regards >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >