It does not return a DataFrame. It returns Dataset[Long].
You do not need to collect(). See my email.

On Sat, Jun 27, 2020, 11:33 AM Anwar AliKhan <anwaralikhan...@gmail.com>
wrote:

> So the range function actually returns BigInt (Spark SQL type)
> and the fact Dataset[Long] and printSchema are displaying (toString())
> Long instead of BigInt needs looking into.
>
> Putting that to one side
>
> My issue with using collect() to get around the casting of elements
> returned
> by range is,  I read some literature which says the collect() returns all
> the data to the driver
> and so can likely cause Out Of memory error.
>
> Question:
> Is it correct that collect() behaves that way and can cause Out of memory
> error ?
>
> Obviously it will be better to use  .map for casting because then the work
> is being done by workers.
> spark.range(10).map(_.toLong),reduce(_+_)
> <http://www.backbutton.co.uk/>
>
>
> On Sat, 27 Jun 2020, 15:42 Sean Owen, <sro...@gmail.com> wrote:
>
>> There are several confusing things going on here. I think this is part
>> of the explanation, not 100% sure:
>>
>> 'bigint' is the Spark SQL type of an 8-byte long. 'long' is the type
>> of a JVM primitive. Both are the same, conceptually, but represented
>> differently internally as they are logically somewhat different ideas.
>>
>> The first thing I'm not sure about is why the toString of
>> Dataset[Long] reports a 'bigint' and printSchema() reports 'long'.
>> That might be a (cosmetic) bug.
>>
>> Second, in Scala 2.12, its SAM support causes calls to reduce() and
>> other methods, using an Object type, to be ambiguous, because Spark
>> has long since had Java-friendly overloads that support a SAM
>> interface for Java callers. Those weren't removed to avoid breakage,
>> at the cost of having to explicitly tell it what overload you want.
>> (They are equivalent)
>>
>> This is triggered because range() returns java.lang.Longs, not long
>> primitives (i.e. scala.Long). I assume that is to make it versatile
>> enough to use in Java too, and because it's hard to write an overload
>> (would have to rename it)
>>
>> But that means you trigger the SAM overload issue.
>>
>> Anything you do that makes this a Dataset[scala.Long] resolves it, as
>> it is no longer ambiguous (Java-friendly Object-friendly overload does
>> not apply). For example:
>>
>> spark.range(10).map(_.toLong).reduce(_+_)
>>
>> If you collect(), you still have an Array[java.lang.Long]. But Scala
>> implicits and conversions make .reduce(_+_) work fine on that; there
>> is no "Java-friendly" overload in the way.
>>
>> Normally all of this just works and you can ignore these differences.
>> This is a good example of a corner case in which it's inconvenient,
>> because of the old Java-friendly overloads. This is by design though.
>>
>> On Sat, Jun 27, 2020 at 8:29 AM Anwar AliKhan <anwaralikhan...@gmail.com>
>> wrote:
>> >
>> > As you know I have been puzzling over this issue :
>> > How come spark.range(100).reduce(_+_)
>> > worked in earlier spark version but not with the most recent versions.
>> >
>> > well,
>> >
>> > When you first create a dataset, by default the column "id" datatype
>> is  [BigInt],
>> > It is a bit like a coin Long on one side and bigint on the other side.
>> >
>> > scala> val myrange = spark.range(1,100)
>> > myrange: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>> >
>> > The Spark framework error message after parsing the reduce(_+_) method
>> confirms this
>> > and moreover stresses its constraints of expecting data  type long as
>> parameter argument(s).
>> >
>> > scala> myrange.reduce(_+_)
>> > <console>:26: error: overloaded method value reduce with alternatives:
>> >   (func:
>> org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long
>> <and>
>> >   (func: (java.lang.Long, java.lang.Long) =>
>> java.lang.Long)java.lang.Long
>> >  cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)
>> >        myrange.reduce(_+_)
>> >                ^
>> >
>> > But if you ask the printSchema method it disagrees with both of the
>> above and says the column "id" data is Long.
>> > scala> range100.printSchema()
>> > root
>> >  |-- id: long (nullable = false)
>> >
>> > If I ask the collect() method, the collect() method  agrees with
>> printSchema() that the datatype of column "id" is  Long and not BigInt.
>> >
>> > scala> range100.collect()
>> > res10: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
>> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>> 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
>> 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
>> 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88,
>> 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
>> >
>> > To settle the dispute between the methods and get the collect() to
>> "show me the money" I  called the collect() to pass its return type to
>> reduce(_+_).
>> >
>> > "Here is the money"
>> > scala> range100.collect().reduce(_+_)
>> > res11: Long = 4950
>> >
>> > The collect() and printSchema methods could be implying  there is no
>> difference between a Long or  a BingInt.
>> >
>> > Questions :  These return type  differentials, are they  by design  or
>> an oversight  bug ?
>> > Questions :  Why the change from earlier version to later version ?
>> > Question   :     Will you be updating the reduce(_+_)  method ?
>> >
>> > When it comes to creating a dataset using toDs there is no dispute,
>> > all the methods agree that it is neither a BigInt or a Long but an int
>> even integer.
>> >
>> > scala> val dataset = Seq(1, 2, 3).toDS()
>> > dataset: org.apache.spark.sql.Dataset[Int] = [value: int]
>> >
>> > scala> dataset.collect()
>> > res29: Array[Int] = Array(1, 2, 3)
>> >
>> > scala> dataset.printSchema()
>> > root
>> >  |-- value: integer (nullable = false)
>> >
>> > scala> dataset.show()
>> > +-----+
>> > |value|
>> > +-----+
>> > |    1|
>> > |    2|
>> > |    3|
>> > +-----+
>> >
>> > scala> dataset.reduce(_+_)
>> > res7: Int = 6
>> >
>>
>

Reply via email to