It is well known that wide tables are not the most efficient way to organize data. However, sometimes we have to deal with extremely wide tables featuring thousands of columns. For example, loading data from legacy systems.
*We have performed an investigation of how the number of columns affects the duration of Spark jobs. * Two basic Spark (2.3.1) jobs are used for testing. The two jobs use distinct approaches to instantiate a DataFrame. Each reads a .csv file into a DataFrame and performs count. Each job is repeated with input files having different number of columns and the execution time is measured. 16 files with 100 - 20,000 columns are used. The files are generated in such a way that their size (rows * columns) is constant (200,000 cells, approx. 2 MB). This means the files with more columns have fewer rows. Each job is repeated 7 times for each file, in order to accumulate better statistics. The results of the measurements are shown in the figure job_duration_VS_number_of_columns.jpg <http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/job_duration_VS_number_of_columns.jpg> Significantly different complexity of DataFrame construction is observed for the two approaches: *1. spark.read.format()*: similar results for a. csv and parquet formats (parquet created from the same csv): .format(<csv/parquet>) b. schema-on-read on/off: .option(inferSchema=<true/false>) c. provided schema loaded from file (stored schema from previous run): .schema(<schema>) Polynomial complexity on the number of columns is observed. // Get SparkSession val spark = SparkSession .builder .appName(s"TestSparkReadFormat${runNo}") .master("local[*]") .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows. .config("spark.debug.maxToStringFields", 20000) .getOrCreate() // Read data val df = spark.read.format("csv") .option("sep", ",") .option("inferSchema", "false") .option("header", "true") .load(inputPath) // Count rows and columns val nRows = df.count() val nColumns = df.columns.length spark.stop() *2. spark.createDataFrame(rows, schema)*: where rows and schema are constructed by splitting lines of text file. Linear complexity on the number of columns is observed. // Get SparkSession val spark = SparkSession .builder .appName(s"TestSparkCreateDataFrame${runNo}") .master("local[*]") .config("spark.sql.warehouse.dir", "file:///C:/temp") // on Windows. .config("spark.debug.maxToStringFields", 20000) .getOrCreate() // load file val sc = spark.sparkContext val lines = sc.textFile(inputPath) //create schema from headers val headers = lines.first val fs = headers.split(",").map(f => StructField(f, StringType)) val schema = StructType(fs) // read data val noheaders = lines.filter(_ != headers) val rows = noheaders.map(_.split(",")).map(a => Row.fromSeq(a)) // create Data Frame val df: DataFrame = spark.createDataFrame(rows, schema) // count rows and columns val nRows = df.count() val nColumns = df.columns.length spark.stop() The similar polynomial complexity on the total number of columns in a DataFrame is also observed in more complex testing jobs. Those jobs perform the following transformations on the fixed number of columns: • Filter • GroupBy • Sum • withColumn What could be the reason for the polynomial dependence of the job duration on the number of columns? *What is an efficient way to address wide data using Spark? * -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org