It does not return a DataFrame. It returns Dataset[Long]. You do not need to collect(). See my email.
On Sat, Jun 27, 2020, 11:33 AM Anwar AliKhan <anwaralikhan...@gmail.com> wrote: > So the range function actually returns BigInt (Spark SQL type) > and the fact Dataset[Long] and printSchema are displaying (toString()) > Long instead of BigInt needs looking into. > > Putting that to one side > > My issue with using collect() to get around the casting of elements > returned > by range is, I read some literature which says the collect() returns all > the data to the driver > and so can likely cause Out Of memory error. > > Question: > Is it correct that collect() behaves that way and can cause Out of memory > error ? > > Obviously it will be better to use .map for casting because then the work > is being done by workers. > spark.range(10).map(_.toLong),reduce(_+_) > <http://www.backbutton.co.uk/> > > > On Sat, 27 Jun 2020, 15:42 Sean Owen, <sro...@gmail.com> wrote: > >> There are several confusing things going on here. I think this is part >> of the explanation, not 100% sure: >> >> 'bigint' is the Spark SQL type of an 8-byte long. 'long' is the type >> of a JVM primitive. Both are the same, conceptually, but represented >> differently internally as they are logically somewhat different ideas. >> >> The first thing I'm not sure about is why the toString of >> Dataset[Long] reports a 'bigint' and printSchema() reports 'long'. >> That might be a (cosmetic) bug. >> >> Second, in Scala 2.12, its SAM support causes calls to reduce() and >> other methods, using an Object type, to be ambiguous, because Spark >> has long since had Java-friendly overloads that support a SAM >> interface for Java callers. Those weren't removed to avoid breakage, >> at the cost of having to explicitly tell it what overload you want. >> (They are equivalent) >> >> This is triggered because range() returns java.lang.Longs, not long >> primitives (i.e. scala.Long). I assume that is to make it versatile >> enough to use in Java too, and because it's hard to write an overload >> (would have to rename it) >> >> But that means you trigger the SAM overload issue. >> >> Anything you do that makes this a Dataset[scala.Long] resolves it, as >> it is no longer ambiguous (Java-friendly Object-friendly overload does >> not apply). For example: >> >> spark.range(10).map(_.toLong).reduce(_+_) >> >> If you collect(), you still have an Array[java.lang.Long]. But Scala >> implicits and conversions make .reduce(_+_) work fine on that; there >> is no "Java-friendly" overload in the way. >> >> Normally all of this just works and you can ignore these differences. >> This is a good example of a corner case in which it's inconvenient, >> because of the old Java-friendly overloads. This is by design though. >> >> On Sat, Jun 27, 2020 at 8:29 AM Anwar AliKhan <anwaralikhan...@gmail.com> >> wrote: >> > >> > As you know I have been puzzling over this issue : >> > How come spark.range(100).reduce(_+_) >> > worked in earlier spark version but not with the most recent versions. >> > >> > well, >> > >> > When you first create a dataset, by default the column "id" datatype >> is [BigInt], >> > It is a bit like a coin Long on one side and bigint on the other side. >> > >> > scala> val myrange = spark.range(1,100) >> > myrange: org.apache.spark.sql.Dataset[Long] = [id: bigint] >> > >> > The Spark framework error message after parsing the reduce(_+_) method >> confirms this >> > and moreover stresses its constraints of expecting data type long as >> parameter argument(s). >> > >> > scala> myrange.reduce(_+_) >> > <console>:26: error: overloaded method value reduce with alternatives: >> > (func: >> org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long >> <and> >> > (func: (java.lang.Long, java.lang.Long) => >> java.lang.Long)java.lang.Long >> > cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long) >> > myrange.reduce(_+_) >> > ^ >> > >> > But if you ask the printSchema method it disagrees with both of the >> above and says the column "id" data is Long. >> > scala> range100.printSchema() >> > root >> > |-- id: long (nullable = false) >> > >> > If I ask the collect() method, the collect() method agrees with >> printSchema() that the datatype of column "id" is Long and not BigInt. >> > >> > scala> range100.collect() >> > res10: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, >> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >> 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, >> 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, >> 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, >> 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99) >> > >> > To settle the dispute between the methods and get the collect() to >> "show me the money" I called the collect() to pass its return type to >> reduce(_+_). >> > >> > "Here is the money" >> > scala> range100.collect().reduce(_+_) >> > res11: Long = 4950 >> > >> > The collect() and printSchema methods could be implying there is no >> difference between a Long or a BingInt. >> > >> > Questions : These return type differentials, are they by design or >> an oversight bug ? >> > Questions : Why the change from earlier version to later version ? >> > Question : Will you be updating the reduce(_+_) method ? >> > >> > When it comes to creating a dataset using toDs there is no dispute, >> > all the methods agree that it is neither a BigInt or a Long but an int >> even integer. >> > >> > scala> val dataset = Seq(1, 2, 3).toDS() >> > dataset: org.apache.spark.sql.Dataset[Int] = [value: int] >> > >> > scala> dataset.collect() >> > res29: Array[Int] = Array(1, 2, 3) >> > >> > scala> dataset.printSchema() >> > root >> > |-- value: integer (nullable = false) >> > >> > scala> dataset.show() >> > +-----+ >> > |value| >> > +-----+ >> > | 1| >> > | 2| >> > | 3| >> > +-----+ >> > >> > scala> dataset.reduce(_+_) >> > res7: Int = 6 >> > >> >