Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very 
simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all 
groups first, which typically means spilling to disk if the data set has any 
significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the 
record.


> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá 
> <juan.rodriguez.hort...@gmail.com> wrote:
> 
> Hi, 
> 
> I've been trying to write a function to compute the difference between 2 
> datasets. With that I mean computing a dataset that has all the elements of a 
> dataset that are not present in another dataset. I first tried using 
> coCogroup, but it was very slow in a local execution environment, and often 
> was crashing with OOM. Then I tried with leftOuterJoin and got similar 
> results. I then tried the following:
> 
> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
> 
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
>     var latestOtherOpt: Option[T] = None
>     partitionIter.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
> 
> This is basically the idea of removing duplicates in a collection by first 
> sorting it, and then traversing it from beginning to end, removing the 
> elements that are consecutive to an element we just saw. That is extended 
> here to mark whether an element is coming from `self` or from `other`, 
> keeping only elements from `self` that are not following another occurrence 
> of the same element in `other`. That code is also really slow on a local 
> execution environment, and crashes a lot. But when I replace `sortPartition` 
> by sorting each partition in memory inside a mapPartition, it works ok with 
> the local execution environment.
> 
> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = 
> {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
>     val sortedPartition = {
>       val partition = partitionIter.toArray
>       util.Sorting.quickSort(partition)
>       partition
>     }
>     var latestOtherOpt: Option[T] = None
>     sortedPartition.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
> 
> I'm surprised by such a big difference. This is my code 
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
>  and a test 
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
>  I use for running this. I'm very surprised with these performance issues 
> with such small DataSet sizes, with less than 20 elements. Is this because 
> I'm running the program with a local execution environment?, are operations 
> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the 
> local environment? If that is the case, is there any other alternative 
> environment recommended for development in a single machine, where I won't be 
> experiencing these issues with those operations? Should I expect the function 
> `minussWithSortPartition` above to run efficiently on a cluster? Or maybe 
> there is something wrong with my code? Are there any plans to provide a 
> built-in minus operator in future versions of Flink?
> 
> Thanks, 
> 
> Juan 
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to