Programmatic: parquet file corruption error

2020-03-26 Thread Zahid Rahman
Hi,

When I run the code for a user defined data type dataset using case class
in scala  and run the code in the interactive spark-shell against parquet
file. The results are as expected.
However I then the same code programmatically in IntelliJ IDE then spark is
give a file corruption error.

Steps I have taken to determine the source of error are :
I have tested for file permission and made sure to chmod 777 , just in
case.
I tried a fresh copy of same parquet file.
I ran both programme before and after the fresh copy.
I also rebooted then ran programmatically against a fresh parquet file.
The corruption error was consistent in all cases.
I have copy and pasted the spark-shell , the error message and the code in
the IDE and the pom.xml, IntelliJ java  classpath command line.

Perhaps the code in the libraries are different than the ones  used by
spark-shell from that when run programmatically.
I don't believe it is an error on my part.
<--

07:28:45 WARN  CorruptStatistics:117 - Ignoring statistics because
created_by could not be parsed (see PARQUET-251): parquet-mr (build
32c46643845ea8a705c35d4ec8fc654cc8ff816d)
org.apache.parquet.VersionParser$VersionParseException: Could not parse
created_by: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
using format:
(.*?)\s+version\s*(?:([^(]*?)\s*(?:\(\s*build\s*([^)]*?)\s*\))?)?
at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
at
org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:72)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatisticsInternal(ParquetMetadataConverter.java:435)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:454)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:914)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:885)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:532)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:105)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildReaderBase(ParquetPartitionReaderFactory.scala:174)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.createVectorizedReader(ParquetPartitionReaderFactory.scala:205)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildColumnarReader(ParquetPartitionReaderFactory.scala:103)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory.$anonfun$createColumnarReader$1(FilePartitionReaderFactory.scala:38)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory$$Lambda$2018/.apply(Unknown
Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.getNextReader(FilePartitionReader.scala:109)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:42)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
at
org.apache.spark.sql.execution.SparkPlan$$Lambda$1879/.apply(Unknown
Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.RDD$$Lambda$1875/.apply(Unknown
Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at or

Need to order iterator values in spark dataframe

2020-03-26 Thread Ranjan, Abhinav

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic 
on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by the 
column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if 
the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with that 
approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess up 
the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_



Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Enrico Minack

Abhinav,

you can repartition by your key, then sortWithinPartition, and the 
groupByKey. Since data are already hash-partitioned by key, Spark should 
not shuffle the data hence change the sort wihtin each partition:


ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:


Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic 
on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by 
the column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM 
if the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with that 
approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess up 
the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_





Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Zahid Rahman
I believe I logged an issue first and I should get a response first.
I was ignored.

Regards

Did you know there are 8 million people in kashmir locked up in their homes
by the Hindutwa (Indians)
for 8 months.
Now the whole planet is locked up in their homes.
You didn't take notice of them either.
you ignored them.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Thu, 26 Mar 2020 at 17:24, Enrico Minack  wrote:

> Abhinav,
>
> you can repartition by your key, then sortWithinPartition, and the
> groupByKey. Since data are already hash-partitioned by key, Spark should
> not shuffle the data hence change the sort wihtin each partition:
>
> ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
>
> Enrico
>
> Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
>
> Hi,
>
> I have a dataframe which has data like:
>
> key |code|code_value
> 1|c1|11
> 1|c2|12
> 1|c2|9
> 1|c3|12
> 1|c2|13
> 1|c2|14
> 1|c4|12
> 1|c2|15
> 1|c1|12
>
>
> I need to group the data based on key and then apply some custom logic on
> every of the value I got by grouping. So I did this:
>
> lets suppose it is in a dataframe df.
>
> *case class key_class(key: string, code: string, code_value: string)*
>
>
> df
> .as[key_class]
> .groupByKey(_.key)
> .mapGroups {
>   (x, groupedValues) =>
> val status = groupedValues.map(row => {
>   // do some custom logic on row
>   ("SUCCESS")
> }).toList
>
> }.toDF("status")
>
>
> The issue with above approach is the values I get after applying
> groupByKey are not sorted/ordered. I want the values to be sorted by the
> column 'code'.
>
> There is a way to do this:
>
> 1. get them in a list and then apply sort ==> this will result in OOM if
> the iterartor is too big.
>
> 2. I think some how to apply the secondary sort, but problem with that
> approach is I have to keep track of the key change.
>
> 3. sortWithinPartitions cannot be applied because groupBy will mess up the
> order.
>
> 4. Another approach is:
>
> df
> .as[key_class]
> .sort("key").sort("code")
> .map {
>  // do stuff here
> }
>
> but here also I have to keep track of the key change within map function,
> and sometimes this also overflows if the keys are skewed.
>
>
>
> *So is there any way in which I can get the values sorted after grouping
> them by a key.??*
>
>
> *Thanks,*
>
>
> *Abhinav *
>
>
>


results of taken(3) not appearing in console window

2020-03-26 Thread Zahid Rahman
I am running the same code with the same libraries but not getting same
output.
scala>  case class flight (DEST_COUNTRY_NAME: String,
 |  ORIGIN_COUNTRY_NAME:String,
 |  count: BigInt)
defined class flight

scala> val flightDf =
spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
flightDf: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string,
ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val flights = flightDf.as[flight]
flights: org.apache.spark.sql.Dataset[flight] = [DEST_COUNTRY_NAME: string,
ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(3)

*res0: Array[flight] = Array(flight(United States,Romania,1), flight(United
States,Ireland,264), flight(United States,India,69))*


Re: results of taken(3) not appearing in console window

2020-03-26 Thread Reynold Xin
bcc dev, +user

You need to print out the result. Take itself doesn't print. You only got the 
results printed to the console because the Scala REPL automatically prints the 
returned value from take.

On Thu, Mar 26, 2020 at 12:15 PM, Zahid Rahman < zahidr1...@gmail.com > wrote:

> 
> I am running the same code with the same libraries but not getting same
> output.
> scala>  case class flight (DEST_COUNTRY_NAME: String,
>      |                      ORIGIN_COUNTRY_NAME:String,
>      |                      count: BigInt)
> defined class flight
> 
> scala>     val flightDf = spark. read. parquet ( http://spark.read.parquet/
> ) ("/data/flight-data/parquet/2010-summary.parquet/")
> flightDf: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string,
> ORIGIN_COUNTRY_NAME: string ... 1 more field]
> 
> scala> val flights = flightDf. as ( http://flightdf.as/ ) [flight]
> flights: org.apache.spark.sql.Dataset[flight] = [DEST_COUNTRY_NAME:
> string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
> 
> scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
> "Canada").map(flight_row => flight_row).take(3)
> *res0: Array[flight] = Array(flight(United States,Romania,1),
> flight(United States,Ireland,264), flight(United States,India,69))
> *
> 
> 
>  
> 20/03/26 19:09:00 INFO SparkContext: Running Spark version 3.0.0-preview2
> 20/03/26 19:09:00 INFO ResourceUtils:
> ==
> 20/03/26 19:09:00 INFO ResourceUtils: Resources for spark.driver:
> 
> 20/03/26 19:09:00 INFO ResourceUtils:
> ==
> 20/03/26 19:09:00 INFO SparkContext: Submitted application:  chapter2
> 20/03/26 19:09:00 INFO SecurityManager: Changing view acls to: kub19
> 20/03/26 19:09:00 INFO SecurityManager: Changing modify acls to: kub19
> 20/03/26 19:09:00 INFO SecurityManager: Changing view acls groups to:
> 20/03/26 19:09:00 INFO SecurityManager: Changing modify acls groups to:
> 20/03/26 19:09:00 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(kub19);
> groups with view permissions: Set(); users  with modify permissions:
> Set(kub19); groups with modify permissions: Set()
> 20/03/26 19:09:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 46817.
> 20/03/26 19:09:00 INFO SparkEnv: Registering MapOutputTracker
> 20/03/26 19:09:00 INFO SparkEnv: Registering BlockManagerMaster
> 20/03/26 19:09:00 INFO BlockManagerMasterEndpoint: Using org. apache. spark.
> storage. DefaultTopologyMapper (
> http://org.apache.spark.storage.defaulttopologymapper/ ) for getting
> topology information
> 20/03/26 19:09:00 INFO BlockManagerMasterEndpoint:
> BlockManagerMasterEndpoint up
> 20/03/26 19:09:00 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
> 20/03/26 19:09:00 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0baf5097-2595-4542-99e2-a192d7baf37c
> 20/03/26 19:09:00 INFO MemoryStore: MemoryStore started with capacity
> 127.2 MiB
> 20/03/26 19:09:01 INFO SparkEnv: Registering OutputCommitCoordinator
> 20/03/26 19:09:01 WARN Utils: Service 'SparkUI' could not bind on port
> 4040. Attempting port 4041.
> 20/03/26 19:09:01 INFO Utils: Successfully started service 'SparkUI' on
> port 4041.
> 20/03/26 19:09:01 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http:/
> / localhost:4041 ( http://localhost:4041 )
> 20/03/26 19:09:01 INFO Executor: Starting executor ID driver on host
> localhost
> 20/03/26 19:09:01 INFO Utils: Successfully started service ' org. apache. 
> spark.
> network. netty. NettyBlockTransferService (
> http://org.apache.spark.network.netty.nettyblocktransferservice/ ) ' on
> port 38135.
> 20/03/26 19:09:01 INFO NettyBlockTransferService: Server created on
> localhost:38135
> 20/03/26 19:09:01 INFO BlockManager: Using org. apache. spark. storage. 
> RandomBlockReplicationPolicy
> ( http://org.apache.spark.storage.randomblockreplicationpolicy/ ) for block
> replication policy
> 20/03/26 19:09:01 INFO BlockManagerMaster: Registering BlockManager
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:38135 with 127.2 MiB RAM, BlockManagerId(driver,
> localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO BlockManager: Initialized BlockManager:
> BlockManagerId(driver, localhost, 38135, None)
> 20/03/26 19:09:01 INFO SharedState: Setting hive.metastore.warehouse.dir
> ('null') to the value of spark.sql.warehouse.dir
> ('file:/home/kub19/spark-3.0.0-preview2-bin-hadoop2.7/projects/spark-warehouse').
> 
> 20/03/26 19:09:01 INFO SharedState: Warehouse path is
> 'file:/home/kub19/spark-3.0.0-preview2-bin-hadoop2.7/projects/spark-warehouse'.
> 
> 20/03/26 19:09:02 INFO SparkContext: Starting job: parquet at
> chapter2.sc

BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
with the following sparksession configuration

val spark = SparkSession.builder().master("local[*]").appName("Spark
Session take").getOrCreate();

this line works

flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)


however if change the master url like so, with the ip address then the
following error is produced by the position of .take(5)

val spark = 
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();


20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
instance of java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
of org.apache.spark.rdd.MapPartitionsRDD

BUT if I  remove take(5) or change the position of take(5) or insert an
extra take(5) as illustrated in code then it works. I don't see why the
position of take(5) should cause such an error or be caused by changing the
master url

flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)


complete code if you wish to replicate it.

import org.apache.spark.sql.SparkSession

object sessiontest {

  // define specific  data type class then manipulate it using the
filter and map functions
  // this is also known as an Encoder
  case class flight (DEST_COUNTRY_NAME: String,
 ORIGIN_COUNTRY_NAME:String,
 count: BigInt)


  def main(args:Array[String]): Unit ={

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();

import spark.implicits._
val flightDf =
spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightDf.as[flight]

flights.take(5).filter(flight_row =>
flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row =>
flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME,
fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)

  } // main
}





Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Which Spark/Scala version do you use?

On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
> take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert an
> extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
> 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter and 
> map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
I have configured  in IntelliJ as external jars
spark-3.0.0-preview2-bin-hadoop2.7/jar

not pulling anything from maven.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:

> Which Spark/Scala version do you use?
>
> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:
>
>>
>> with the following sparksession configuration
>>
>> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
>> take").getOrCreate();
>>
>> this line works
>>
>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>
>> however if change the master url like so, with the ip address then the
>> following error is produced by the position of .take(5)
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>>
>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
>> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>> instance of java.lang.invoke.SerializedLambda to field
>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>> of org.apache.spark.rdd.MapPartitionsRDD
>>
>> BUT if I  remove take(5) or change the position of take(5) or insert an
>> extra take(5) as illustrated in code then it works. I don't see why the
>> position of take(5) should cause such an error or be caused by changing the
>> master url
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>> 5))
>>flights.show(5)
>>
>>
>> complete code if you wish to replicate it.
>>
>> import org.apache.spark.sql.SparkSession
>>
>> object sessiontest {
>>
>>   // define specific  data type class then manipulate it using the filter 
>> and map functions
>>   // this is also known as an Encoder
>>   case class flight (DEST_COUNTRY_NAME: String,
>>  ORIGIN_COUNTRY_NAME:String,
>>  count: BigInt)
>>
>>
>>   def main(args:Array[String]): Unit ={
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>> import spark.implicits._
>> val flightDf = 
>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>> val flights = flightDf.as[flight]
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>flights.show(5)
>>
>>   } // main
>> }
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>

 with the following sparksession configuration

 val spark = SparkSession.builder().master("local[*]").appName("Spark 
 Session take").getOrCreate();

 this line works

 flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)


 however if change the master url like so, with the ip address then the
 following error is produced by the position of .take(5)

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();


 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
 instance of java.lang.invoke.SerializedLambda to field
 org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
 of org.apache.spark.rdd.MapPartitionsRDD

 BUT if I  remove take(5) or change the position of take(5) or insert an
 extra take(5) as illustrated in code then it works. I don't see why the
 position of take(5) should cause such an error or be caused by changing the
 master url

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
 + 5))
flights.show(5)


 complete code if you wish to replicate it.

 import org.apache.spark.sql.SparkSession

 object sessiontest {

   // define specific  data type class then manipulate it using the filter 
 and map functions
   // this is also known as an Encoder
   case class flight (DEST_COUNTRY_NAME: String,
  ORIGIN_COUNTRY_NAME:String,
  count: BigInt)


   def main(args:Array[String]): Unit ={

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();

 import spark.implicits._
 val flightDf = 
 spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
 val flights = flightDf.as[flight]

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
 fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
flights.show(5)

   } // main
 }





 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 

>>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
 Which Spark/Scala version do you use?

 On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
 wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark 
> Session take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
> instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert
> an extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing 
> the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter 
> and map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
> != "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>