Apologies I missed your two points My question:
#1 If there are 10 tables or more tables, do I need to read each table into memory though Spark bases on memory compution? Every table will be read as I described above. It is lazy read by Spark. The computation happens when there is an action on the underlying read. Regardless of any database (BigQuery, Hive etc), this example below applies: rows = spark.sql(f"""SELECT COUNT(1) FROM {fullyQualifiedTableName}""").collect()[0][0] rows returns the number of rows. You can see what is happening in Spark UI under storage tab. If you don't have enough memory, it will be spilled to disk. Example below a large in-memory table [image: image.png] #2 Is there a much easier way to deal with my scenarios, for example, I just define the datasource(BigQuery) and just parse sqlscript file, others are run by Spark. What do you mean by parse sql script file? Do you mean running a sql file against spark-sql? It will not work because you need to use Spark-BigQuery API to access BigQuery from Spark example using the following jar file spark-bigquery-latest_2.12.jar spark-submit --master local[4] --jars $HOME/jars/spark-bigquery-latest_2.12.jar And the way you read each table I described before # write to BigQuery table s.writeTableToBQ(df2,"overwrite",config['GCPVariables']['targetDataset'],config['GCPVariables']['yearlyAveragePricesAllTable']) print(f"""created {config['GCPVariables']['yearlyAveragePricesAllTable']}""") # read data to ensure all loaded OK read_df = s.loadTableFromBQ(self.spark, config['GCPVariables']['targetDataset'], config['GCPVariables']['yearlyAveragePricesAllTable']) # check that all rows are there if df2.subtract(read_df).count() == 0: print("Data has been loaded OK to BQ table") else: print("Data could not be loaded to BQ table, quitting") sys.exit(1) HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 24 May 2021 at 20:51, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Well, Spark to BigQuery API is very efficient in doing what it needs to > do. Personally I have never found a JDBC connection to BigQuery that works > under all circumstances > > . > In a typical environment you need to set-up your connection variable to > BigQuery from Spark. > > These are my recommended ones > > def setSparkConfBQ(spark): > try: > spark.conf.set("GcpJsonKeyFile", > config['GCPVariables']['jsonKeyFile']) > spark.conf.set("BigQueryProjectId", > config['GCPVariables']['projectId']) > spark.conf.set("BigQueryDatasetLocation", > config['GCPVariables']['datasetLocation']) > spark.conf.set("google.cloud.auth.service.account.enable", "true") > spark.conf.set("fs.gs.project.id", > config['GCPVariables']['projectId']) > spark.conf.set("fs.gs.impl", > "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") > spark.conf.set("fs.AbstractFileSystem.gs.impl", > "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") > spark.conf.set("temporaryGcsBucket", > config['GCPVariables']['tmp_bucket']) > spark.conf.set("spark.sql.streaming.checkpointLocation", > config['GCPVariables']['tmp_bucket']) > return spark > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > Note the setting for GCP temporary bucket for staging Spark writes before > pushing data into bigQuery table. > > The connection from Spark to BigQuery itself is pretty simplified. for > example to reads from BQ table you can do > > def loadTableFromBQ(spark,dataset,tableName): > try: > read_df = spark.read. \ > format("bigquery"). \ > option("credentialsFile", > config['GCPVariables']['jsonKeyFile']). \ > option("dataset", dataset). \ > option("table", tableName). \ > load() > return read_df > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > and how to read it > > read_df = s.loadTableFromBQ(self.spark, > config['GCPVariables']['targetDataset'], config['GCPVariables']['ATable']) > > OK each connection will be lazily evaluated bar checking that the > underlying table exists > > The next stage is to create a read_df Data Frame for each table and you do > joins join etc in Spark itself. > > At times it is more efficient for BigQuery to do the join itself and > create a result set table in BigQuery dataset that you can import into > Spark. > > Whatever approach there is a solution and as usual your mileage varies so > to speak. > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 14 May 2021 at 01:50, bo zhao <zhaobo20082...@gmail.com> wrote: > >> Hi Team, >> >> I've followed Spark community for several years. This is my first time >> for asking help. I hope you guys can give some experience. >> >> I want to develop a spark application with processing a sqlscript file. >> The data is on BigQuery. >> For example, the sqlscript is: >> >> delete from tableA; >> insert into tableA select b.columnB1, c.columnC2 from tableB b, tableC c; >> >> >> I can parse this file. In my opinion, After parsing the file, steps >> should follow these below: >> >> #step1: read tableB, tableC into memory(Spark) >> #step2. register views for tableB's dataframe and tableC's dataframe >> #step3. use spark.sql("select b.columnB1, c.columnC2 from tableB b, >> tableC c") to get a new dataframe >> #step4. new dataframe.write().() to tableA using mode of "OVERWRITE" >> >> My question: >> #1 If there are 10 tables or more tables, do I need to read each table >> into memory though Spark bases on memory compution? >> #2 Is there a much easier way to deal with my scenarios, for example, I >> just define the datasource(BigQuery) and just parse sqlscript file, others >> are run by Spark. >> >> Please share your experience or idea. >> >