It is well known that wide tables are not the most efficient way to organize
data. However, sometimes we have to deal with extremely wide tables
featuring thousands of columns. For example, loading data from legacy
systems.

*We have performed an investigation of how the number of columns affects the
duration of Spark jobs. *

Two basic Spark (2.3.1) jobs are used for testing. The two jobs use distinct
approaches to instantiate a DataFrame. Each reads a .csv file into a
DataFrame and performs count. Each job is repeated with input files having
different number of columns and the execution time is measured. 16 files
with 100 - 20,000 columns are used. The files are generated in such a way
that their size (rows * columns) is constant (200,000 cells, approx. 2 MB).
This means the files with more columns have fewer rows. Each job is repeated
7 times for each file, in order to accumulate better statistics.

The results of the measurements are shown in the figure
  job_duration_VS_number_of_columns.jpg
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/job_duration_VS_number_of_columns.jpg>
  
Significantly different complexity of DataFrame construction is observed for
the two approaches:

*1. spark.read.format()*: similar results for 
          a.    csv and parquet formats (parquet created from the same csv): 
.format(<csv/parquet>)
          b.    schema-on-read on/off:  .option(inferSchema=<true/false>) 
          c.    provided schema loaded from file (stored schema from previous
run): .schema(<schema>)
Polynomial  complexity on the number of columns is observed.

// Get SparkSession
val spark = SparkSession
  .builder
  .appName(s"TestSparkReadFormat${runNo}")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
  .config("spark.debug.maxToStringFields", 20000)
  .getOrCreate()

// Read data  
val df = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "false")
  .option("header", "true")
  .load(inputPath)

// Count rows and columns
val nRows = df.count()
val nColumns = df.columns.length
spark.stop()


*2. spark.createDataFrame(rows, schema)*: where rows and schema are
constructed by splitting lines of text file. 
Linear complexity on the number of columns is observed.

// Get SparkSession
val spark = SparkSession
  .builder
  .appName(s"TestSparkCreateDataFrame${runNo}")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows.
  .config("spark.debug.maxToStringFields", 20000)
  .getOrCreate()

// load file
val sc = spark.sparkContext
val lines = sc.textFile(inputPath)

//create schema from headers
val headers = lines.first
val fs = headers.split(",").map(f => StructField(f, StringType))
val schema = StructType(fs)

// read data
val noheaders = lines.filter(_ != headers)
val rows = noheaders.map(_.split(",")).map(a => Row.fromSeq(a))

// create Data Frame
val df: DataFrame = spark.createDataFrame(rows, schema)

// count rows and columns
val nRows = df.count()
val nColumns = df.columns.length
spark.stop()

The similar polynomial complexity on the total number of columns in a
DataFrame is also observed in more complex testing jobs. Those jobs perform
the following transformations on the fixed number of columns:
•       Filter
•       GroupBy
•       Sum
•       withColumn

What could be the reason for the polynomial dependence of the job duration
on the number of columns? *What is an efficient way to address wide data
using Spark?
*



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to