Hi, I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:
private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = { val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) val all = selfMarked.union(otherMarked) .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition .sortPartition[(T, Boolean)](identity, Order.ASCENDING) all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) => var latestOtherOpt: Option[T] = None partitionIter.foreach { case (otherElem, false) => latestOtherOpt = Some(otherElem) case (selfElem, true) => if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) } } } This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment. private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = { val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) val all = selfMarked.union(otherMarked) .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) => val sortedPartition = { val partition = partitionIter.toArray util.Sorting.quickSort(partition) partition } var latestOtherOpt: Option[T] = None sortedPartition.foreach { case (otherElem, false) => latestOtherOpt = Some(otherElem) case (selfElem, true) => if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) } } } I'm surprised by such a big difference. This is my code <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>, and a test <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69> I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink? Thanks, Juan