Another option is to just use plain jdbc (if in java) in a foreachPartition call on the dataframe/dataset then you get full control of the insert statement but need to open the connection/transaction yourself
On Sat, 19 Jun 2021 at 19:33, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > I did some research on this. > > The only way one can write to Oracle from Spark is through JDBC (excluding > other options outside of Spark). > > The challenge here is that you have a column based on function > get_function() column that Spark needs to insert. Currently there is no > way of inserting records from Park using the traditional INSERT SELECT > statement. For example this does not work through Spark > > scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy6 (id) > values (2); > > The batch insert option seems to be fastest > > df.write. \ > format("jdbc"). \ > option("url", oracle_url). \ > option("user", user). \ > option("dbtable", "scratchpad.randomdata"). \ # you > cannot replace this with sql insert! > option("password", password). \ > option("driver", driver). \ > mode(mode). \ > save() > > How about creating a cursor on DF > > for row in df.rdd.collect(): > id = row[0] > clustered = row[1] > scattered = row[2] > randomised = row[3] > random_string = row[4] > small_vc = row[5] > padding= row[6] > > This will print out the individual column values row by row from the > dataframe but cannot do much about it > > The only option I can see here is to create a staging table EXCLUDING the > derived column and write to that table. > > Next go to Oracle itself and do an insert/select from the staging table to > the target table. Let us create table dumm7 in the image of the one > created by spark > > scratch...@orasource.mich.LOCAL> create table scratchpad.dummy7 as select > * from scratchpad.randomdata where 1 = 2; > > Table created. > > Add a new derived column to it, call it derived_col > > scratch...@orasource.mich.LOCAL> alter table scratchpad.dummy7 add > derived_col float; > > Table altered. > > Now insert/select from scratchpad.randomdata to scratchpad.dummy7. Let us > populate the new added column with cos(id) > > scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy7 (id, > CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING, > DERIVED_COL) > 2 select id, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, > PADDING, *cos(id)* from randomdata; > > 10 rows created. > > This should work, unless there is a way of inserting columns directly from > Spark. > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 18 Jun 2021 at 22:14, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Well the challenge is that Spark is best suited to insert a dataframe >> into the Oracle table, i.e. a bulk insert >> >> that insert into table (column list) values (..) is a single record >> insert .. Can you try creating a staging table in oracle without >> get_function() column and do a bulk insert from Spark dataframe to that >> staging table? >> >> HTH >> >> Mich >> >> >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Fri, 18 Jun 2021 at 21:53, Anshul Kala <anshul.k...@gmail.com> wrote: >> >>> >>> Hi Mich, >>> >>> Thanks for your reply. Please advise the insert query that I need to >>> substitute should be like below: >>> >>> Insert into table(a,b,c) values(?,get_function_value(?),?) >>> >>> In the statement above : >>> >>> ? : refers to value from dataframe column values >>> get_function_value : refers to be the function where one of the data >>> frame column is passed as input >>> >>> >>> Thanks >>> Anshul >>> >>> >>> Thanks >>> Anshul >>> >>> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> I gather you mean using JDBC to write to the Oracle table? >>>> >>>> Spark provides a unified framework to write to any JDBC >>>> compliant database. >>>> >>>> def writeTableWithJDBC(dataFrame, url, tableName, user, password, >>>> driver, mode): >>>> try: >>>> dataFrame. \ >>>> write. \ >>>> format("jdbc"). \ >>>> option("url", url). \ >>>> option("dbtable", tableName). \ >>>> option("user", user). \ >>>> option("password", password). \ >>>> option("driver", driver). \ >>>> mode(mode). \ >>>> save() >>>> except Exception as e: >>>> print(f"""{e}, quitting""") >>>> sys.exit(1) >>>> >>>> and how to write it >>>> >>>> def loadIntoOracleTable(self, df2): >>>> # write to Oracle table, all uppercase not mixed case and >>>> column names <= 30 characters in version 12.1 >>>> tableName = >>>> self.config['OracleVariables']['yearlyAveragePricesAllTable'] >>>> fullyQualifiedTableName = >>>> self.config['OracleVariables']['dbschema']+'.'+tableName >>>> user = self.config['OracleVariables']['oracle_user'] >>>> password = self.config['OracleVariables']['oracle_password'] >>>> driver = self.config['OracleVariables']['oracle_driver'] >>>> mode = self.config['OracleVariables']['mode'] >>>> >>>> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode) >>>> print(f"""created >>>> {config['OracleVariables']['yearlyAveragePricesAllTable']}""") >>>> # read data to ensure all loaded OK >>>> fetchsize = self.config['OracleVariables']['fetchsize'] >>>> read_df = >>>> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize) >>>> # check that all rows are there >>>> if df2.subtract(read_df).count() == 0: >>>> print("Data has been loaded OK to Oracle table") >>>> else: >>>> print("Data could not be loaded to Oracle table, quitting") >>>> sys.exit(1) >>>> >>>> in the statement where it says >>>> >>>> option("dbtable", tableName). \ >>>> >>>> You can replace *tableName* with the equivalent SQL insert statement >>>> >>>> You will need JDBC driver for Oracle say ojdbc6.jar in >>>> $SPARK_HOME/conf/spark-defaults.conf >>>> >>>> spark.driver.extraClassPath >>>> /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar >>>> >>>> HTH >>>> >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala <anshul.k...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am using spark to ingest data from file to database Oracle table . >>>>> For one of the fields , the value to be populated is generated from a >>>>> function that is written in database . >>>>> >>>>> The input to the function is one of the fields of data frame >>>>> >>>>> I wanted to use spark.dbc.write to perform the operation, which >>>>> generates the insert query at back end . >>>>> >>>>> For example : It can generate the insert query as : >>>>> >>>>> Insert into table values (?,?, getfunctionvalue(?) ) >>>>> >>>>> Please advise if it is possible in spark and if yes , how can it be >>>>> done >>>>> >>>>> This is little urgent for me . So any help is appreciated >>>>> >>>>> Thanks >>>>> Anshul >>>>> >>>>