You have to persist it and read from it for the subsequent operations. Take
a look at the FlinkMLTools.persist methods.


On Thu, Apr 28, 2016 at 6:14 PM, Sergio Ramírez <>

> Hello,
> OK, now I understand everything. So if I want to re-use my DataSet in
> several different operations, what should I do? Is there any way to
> maintain the save the data from re-computation? I am not only talking about
> iteration. I mean re-use of data.
> For example, imagine I want filter some results and collect, and then I
> want to apply another filter.
> Regards
> On 26/04/16 14:25, Till Rohrmann wrote:
>> Hi Sergio,
>> sorry for the late reply. I figured out your problem. The reason why you
>> see apparently inconsistent results is that you execute your job multiple
>> times. Each collect call triggers an eager execution of your Flink job.
>> Since the intermediate results are not stored the whole topology has to be
>> re-executed for every collect call. Since the input split assignment of
>> your libSVM file happens lazily, it can happen that the different sub
>> tasks
>> get different splits of the input file assigned. Therefore, it happens
>> that
>> you see different lengths for different features of the same partition.
>> If you replace the last 6 lines of your program with:
>> transposed.filter(_._1._1 == (nFeatures - 1)).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>> val b = transposed.filter(_._1._1 == 10).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>> val c = transposed.filter(_._1._1 == 12).map(t => t._1 ->
>> t._2.size).reduceGroup(_.mkString(",")).output(new
>> PrintingOutputFormat())
>> env.execute()
>> you should see the correct results. If you need a deterministic input
>> split
>> assignment to the different sources, then you would have to implement your
>> own InputFormat which returns a special InputSplitAssigner which does the
>> deterministic input split assignment. Or simply try to avoid collect,
>> count
>> and print which trigger the eager execution of a Flink job.
>> Cheers,
>> Till
>> ​
>> On Wed, Apr 13, 2016 at 5:47 PM, Sergio Ramírez <>
>> wrote:
>> Hello again:
>>> Any news about this problem with enriched MapPartition function?
>>> Thank you
>>> On 06/04/16 17:01, Sergio Ramírez wrote:
>>> Hello,
>>>> Ok, please find enclosed the test code and the input data.
>>>> Cheers
>>>> On 31/03/16 10:07, Till Rohrmann wrote:
>>>> Hi Sergio,
>>>>> could you please provide a complete example (including input data) to
>>>>> reproduce your problem. It is hard to tell what's going wrong when one
>>>>> only
>>>>> sees a fraction of the program.
>>>>> Cheers,
>>>>> Till
>>>>> On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <
>>>>> wrote:
>>>>> Hi again,
>>>>>> I've not been able to solve the problem with the instruction you gave
>>>>>> me.
>>>>>> I've tried with static variables (matrices) also unsuccessfully. I've
>>>>>> also
>>>>>> tried this simpler code:
>>>>>> def mapPartition(it: java.lang.Iterable[LabeledVector], out:
>>>>>> Collector[((Int, Int), Int)]): Unit = {
>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask() //
>>>>>> Partition index
>>>>>>             var ninst = 0
>>>>>>             for(reg <- it.asScala) {
>>>>>>               requireByteValues(reg.vector)
>>>>>>               ninst += 1
>>>>>>             }
>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>> ninst)
>>>>>>           }
>>>>>> The result is as follows:
>>>>>> Attribute 10, first seven partitions:
>>>>>> ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
>>>>>> Attribute 12, first seven partitions:
>>>>>> ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
>>>>>> As you can see, for example, for block 6 different number of instances
>>>>>> are
>>>>>> shown, but  it's impossible.
>>>>>> On 24/03/16 22:39, Chesnay Schepler wrote:
>>>>>> Haven't looked to deeply into this, but this sounds like object reuse
>>>>>> is
>>>>>>> enabled, at which point buffering values effectively causes you to
>>>>>>> store
>>>>>>> the same value multiple times.
>>>>>>> can you try disabling objectReuse using
>>>>>>> env.getConfig().disableObjectReuse() ?
>>>>>>> On 22.03.2016 16:53, Sergio Ramírez wrote:
>>>>>>> Hi all,
>>>>>>>> I've been having some problems with RichMapPartitionFunction.
>>>>>>>> Firstly, I
>>>>>>>> tried to convert the iterable into an array unsuccessfully. Then, I
>>>>>>>> have
>>>>>>>> used some buffers to store the values per column. I am trying to
>>>>>>>> transpose
>>>>>>>> the local matrix of LabeledVectors that I have in each partition.
>>>>>>>> None of these solutions have worked. For example, for partition 7
>>>>>>>> and
>>>>>>>> feature 10, the vector is empty, whereas for the same partition and
>>>>>>>> feature
>>>>>>>> 11, the vectors contains 200 elements. And this change on each
>>>>>>>> execution,
>>>>>>>> different partitions and features.
>>>>>>>> I think there is a problem with using the collect method out of the
>>>>>>>> iterable loop.
>>>>>>>> new RichMapPartitionFunction[LabeledVector, ((Int, Int),
>>>>>>>> Array[Byte])]()
>>>>>>>> {
>>>>>>>>           def mapPartition(it: java.lang.Iterable[LabeledVector],
>>>>>>>> out:
>>>>>>>> Collector[((Int, Int), Array[Byte])]): Unit = {
>>>>>>>>             val index = getRuntimeContext().getIndexOfThisSubtask()
>>>>>>>>             val mat = for (i <- 0 until nFeatures) yield new
>>>>>>>> scala.collection.mutable.ListBuffer[Byte]
>>>>>>>>             for(reg <- it.asScala) {
>>>>>>>>               for (i <- 0 until (nFeatures - 1)) mat(i) +=
>>>>>>>> reg.vector(i).toByte
>>>>>>>>               mat(nFeatures - 1) += classMap(reg.label)
>>>>>>>>             }
>>>>>>>>             for(i <- 0 until nFeatures) out.collect((i, index) ->
>>>>>>>> mat(i).toArray) // numPartitions
>>>>>>>>           }
>>>>>>>>    }
>>>>>>>> Regards

Reply via email to