Sergio Ramírez created FLINK-3784:
-------------------------------------

             Summary: Unexpected results using collect() in 
RichMapPartitionFunction
                 Key: FLINK-3784
                 URL: https://issues.apache.org/jira/browse/FLINK-3784
             Project: Flink
          Issue Type: Bug
          Components: DataSet API, Machine Learning Library, Scala API
    Affects Versions: 1.0.0
         Environment: Debian 8.3
            Reporter: Sergio Ramírez


The following code (in Scala) outputs unexpected registers when it tries to 
transpose a simple matrix formed by LabeledVector. For each new key (feature, 
partition), a different number of registers is presented despite all new pairs 
should yield the same number of register as the data is dense (please, take a 
look to the result with a sample dataset). 

def mapPartition(it: java.lang.Iterable[LabeledVector], out: Collector[((Int, 
Int), Int)]): Unit = {
          val index = getRuntimeContext().getIndexOfThisSubtask() // Partition 
index
          var ninst = 0
          for(reg <- it.asScala) {
            requireByteValues(reg.vector)
            ninst += 1
          }
          for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
}

Result: 
Attribute 10, first seven partitions: 
((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)

Attribute 12, first seven partitions: 
((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to