Hi,

I've been trying to write a function to compute the difference between 2
datasets. With that I mean computing a dataset that has all the elements of
a dataset that are not present in another dataset. I first tried using
coCogroup, but it was very slow in a local execution environment, and often
was crashing with OOM. Then I tried with leftOuterJoin and got similar
results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
  val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
  val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

  val all = selfMarked.union(otherMarked)
    .partitionByHash(0) // so occurrences of the same value in both
datasets go to the same partition
    .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
  all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)],
collector: Collector[T]) =>
    var latestOtherOpt: Option[T] = None
    partitionIter.foreach {
      case (otherElem, false) => latestOtherOpt = Some(otherElem)
      case (selfElem, true) =>
        if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
    }
  }
}


This is basically the idea of removing duplicates in a collection by first
sorting it, and then traversing it from beginning to end, removing the
elements that are consecutive to an element we just saw. That is extended
here to mark whether an element is coming from `self` or from `other`,
keeping only elements from `self` that are not following another occurrence
of the same element in `other`. That code is also really slow on a local
execution environment, and crashes a lot. But when I replace
`sortPartition` by sorting each partition in memory inside a mapPartition,
it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
  val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
  val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
  val all = selfMarked.union(otherMarked)
    .partitionByHash(0) // so occurrences of the same value in both
datasets go to the same partition
  all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)],
collector: Collector[T]) =>
    val sortedPartition = {
      val partition = partitionIter.toArray
      util.Sorting.quickSort(partition)
      partition
    }
    var latestOtherOpt: Option[T] = None
    sortedPartition.foreach {
      case (otherElem, false) => latestOtherOpt = Some(otherElem)
      case (selfElem, true) =>
        if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
    }
  }
}


I'm surprised by such a big difference. This is my code
<https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
and a test
<https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
I use for running this. I'm very surprised with these performance issues
with such small DataSet sizes, with less than 20 elements. Is this because
I'm running the program with a local execution environment?, are operations
like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
the local environment? If that is the case, is there any other alternative
environment recommended for development in a single machine, where I won't
be experiencing these issues with those operations? Should I expect the
function `minussWithSortPartition` above to run efficiently on a cluster?
Or maybe there is something wrong with my code? Are there any plans to
provide a built-in minus operator in future versions of Flink?

Thanks,

Juan

Reply via email to