Btw. there is a set difference or minus operator in the Table API [1] that might be helpful.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html#set-operations Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske <fhue...@gmail.com >: > Hi Juan, > > Both, the local execution environment and the remote execution environment > run the same code to execute the program. > The implementation of the sortPartition operator was designed to scale to > data sizes that exceed the memory. > Internally, it serializes all records into byte arrays and sorts the > serialized data. This is of course more expensive than keeping all objects > on the heap and sorting them there. > Hence, a certain performance difference is to be expected. However, > something that should not happen is that the program fails. > > What's the magnitude of the performance difference? > Can you post a stack trace of the error? > > Thanks, > Fabian > > Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com>: > >> Hi Ken, >> >> Thanks for the suggestion, that idea should also work for implementing a >> data set difference operation, which is what concerns me here. However, I >> was also curious about why there is so much performance difference between >> using sortPartition and sorting in memory by partition, for datasets as >> small as 20 elements and running in local mode. For that data set sizes I >> would expect no relevant performance difference, but with sortPartition the >> program crashes, so I must be doing something wrong here. >> >> Thanks in any case for the idea. >> >> Greetings, >> >> Juan >> >> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >>> Hi Juan, >>> >>> If you want to deduplicate, then you could group by the record, and use >>> a (very simple) reduce function to only emit a record if the group contains >>> one element. >>> >>> There will be performance issues, though - Flink will have to generate >>> all groups first, which typically means spilling to disk if the data set >>> has any significant size. >>> >>> — Ken >>> >>> PS - I assume that you’ve implemented a valid hashCode()/equals() for >>> the record. >>> >>> >>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá < >>> juan.rodriguez.hort...@gmail.com> wrote: >>> >>> Hi, >>> >>> I've been trying to write a function to compute the difference between 2 >>> datasets. With that I mean computing a dataset that has all the elements of >>> a dataset that are not present in another dataset. I first tried using >>> coCogroup, but it was very slow in a local execution environment, and often >>> was crashing with OOM. Then I tried with leftOuterJoin and got similar >>> results. I then tried the following: >>> >>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = { >>> val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) >>> val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) >>> >>> val all = selfMarked.union(otherMarked) >>> .partitionByHash(0) // so occurrences of the same value in both >>> datasets go to the same partition >>> .sortPartition[(T, Boolean)](identity, Order.ASCENDING) >>> all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: >>> Collector[T]) => >>> var latestOtherOpt: Option[T] = None >>> partitionIter.foreach { >>> case (otherElem, false) => latestOtherOpt = Some(otherElem) >>> case (selfElem, true) => >>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) >>> } >>> } >>> } >>> >>> >>> This is basically the idea of removing duplicates in a collection by >>> first sorting it, and then traversing it from beginning to end, removing >>> the elements that are consecutive to an element we just saw. That is >>> extended here to mark whether an element is coming from `self` or from >>> `other`, keeping only elements from `self` that are not following another >>> occurrence of the same element in `other`. That code is also really slow on >>> a local execution environment, and crashes a lot. But when I replace >>> `sortPartition` by sorting each partition in memory inside a mapPartition, >>> it works ok with the local execution environment. >>> >>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] >>> = { >>> val selfMarked: DataSet[(T, Boolean)] = self.map((_, true)) >>> val otherMarked: DataSet[(T, Boolean)] = other.map((_, false)) >>> val all = selfMarked.union(otherMarked) >>> .partitionByHash(0) // so occurrences of the same value in both >>> datasets go to the same partition >>> all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: >>> Collector[T]) => >>> val sortedPartition = { >>> val partition = partitionIter.toArray >>> util.Sorting.quickSort(partition) >>> partition >>> } >>> var latestOtherOpt: Option[T] = None >>> sortedPartition.foreach { >>> case (otherElem, false) => latestOtherOpt = Some(otherElem) >>> case (selfElem, true) => >>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem) >>> } >>> } >>> } >>> >>> >>> I'm surprised by such a big difference. This is my code >>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>, >>> and a test >>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69> >>> I use for running this. I'm very surprised with these performance issues >>> with such small DataSet sizes, with less than 20 elements. Is this because >>> I'm running the program with a local execution environment?, are operations >>> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in >>> the local environment? If that is the case, is there any other alternative >>> environment recommended for development in a single machine, where I won't >>> be experiencing these issues with those operations? Should I expect the >>> function `minussWithSortPartition` above to run efficiently on a cluster? >>> Or maybe there is something wrong with my code? Are there any plans to >>> provide a built-in minus operator in future versions of Flink? >>> >>> Thanks, >>> >>> Juan >>> >>> >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>>