Hi Juan, If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.
There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size. — Ken PS - I assume that you’ve implemented a valid hashCode()/equals() for the record. > On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá > <juan.rodriguez.hort...@gmail.com> wrote: > > Hi, > > I've been trying to write a function to compute the difference between 2 > datasets. With that I mean computing a dataset that has all the elements of a > dataset that are not present in another dataset. I first tried using > coCogroup, but it was very slow in a local execution environment, and often > was crashing with OOM. Then I tried with leftOuterJoin and got similar > results. I then tried the following: > > private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = { > val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) > val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) > > val all = selfMarked.union(otherMarked) > .partitionByHash(0) // so occurrences of the same value in both datasets > go to the same partition > .sortPartition[(T, Boolean)](identity, Order.ASCENDING) > all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: > Collector[T]) => > var latestOtherOpt: Option[T] = None > partitionIter.foreach { > case (otherElem, false) => latestOtherOpt = Some(otherElem) > case (selfElem, true) => > if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) > } > } > } > > This is basically the idea of removing duplicates in a collection by first > sorting it, and then traversing it from beginning to end, removing the > elements that are consecutive to an element we just saw. That is extended > here to mark whether an element is coming from `self` or from `other`, > keeping only elements from `self` that are not following another occurrence > of the same element in `other`. That code is also really slow on a local > execution environment, and crashes a lot. But when I replace `sortPartition` > by sorting each partition in memory inside a mapPartition, it works ok with > the local execution environment. > > private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = > { > val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) > val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) > val all = selfMarked.union(otherMarked) > .partitionByHash(0) // so occurrences of the same value in both datasets > go to the same partition > all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: > Collector[T]) => > val sortedPartition = { > val partition = partitionIter.toArray > util.Sorting.quickSort(partition) > partition > } > var latestOtherOpt: Option[T] = None > sortedPartition.foreach { > case (otherElem, false) => latestOtherOpt = Some(otherElem) > case (selfElem, true) => > if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) > } > } > } > > I'm surprised by such a big difference. This is my code > <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>, > and a test > <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69> > I use for running this. I'm very surprised with these performance issues > with such small DataSet sizes, with less than 20 elements. Is this because > I'm running the program with a local execution environment?, are operations > like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the > local environment? If that is the case, is there any other alternative > environment recommended for development in a single machine, where I won't be > experiencing these issues with those operations? Should I expect the function > `minussWithSortPartition` above to run efficiently on a cluster? Or maybe > there is something wrong with my code? Are there any plans to provide a > built-in minus operator in future versions of Flink? > > Thanks, > > Juan > > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra