Another option is to just use plain jdbc (if in java) in a foreachPartition
call on the dataframe/dataset then you get full control of the insert
statement but need to open the connection/transaction yourself

On Sat, 19 Jun 2021 at 19:33, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> I did some research on this.
>
> The only way one can write to Oracle from Spark is through JDBC (excluding
> other options outside of Spark).
>
> The challenge here is that you have a column based on function
> get_function() column  that Spark needs to insert. Currently there is no
> way of inserting records from Park using the traditional INSERT SELECT
> statement. For example this does not work through Spark
>
> scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy6 (id)
> values (2);
>
> The batch insert option seems to be fastest
>
>             df.write. \
>                 format("jdbc"). \
>                 option("url", oracle_url). \
>                 option("user", user). \
>                 option("dbtable", "scratchpad.randomdata"). \  # you
> cannot replace this with sql insert!
>                 option("password", password). \
>                 option("driver", driver). \
>                 mode(mode). \
>                 save()
>
> How about creating a cursor on DF
>
>      for row in df.rdd.collect():
>             id = row[0]
>             clustered = row[1]
>             scattered = row[2]
>             randomised = row[3]
>             random_string = row[4]
>             small_vc = row[5]
>             padding= row[6]
>
> This will print out the individual column values row by row from the
> dataframe but cannot do much about it
>
> The only option I can see here is to create a staging table EXCLUDING the
> derived column and write to that table.
>
> Next go to Oracle itself and do an insert/select from the staging table to
> the target table. Let us create table dumm7 in the image of the one
> created by spark
>
> scratch...@orasource.mich.LOCAL> create table scratchpad.dummy7 as select
> * from scratchpad.randomdata where 1 = 2;
>
> Table created.
>
> Add a new derived column to it, call it derived_col
>
> scratch...@orasource.mich.LOCAL> alter table scratchpad.dummy7 add
> derived_col float;
>
> Table altered.
>
> Now insert/select from scratchpad.randomdata to scratchpad.dummy7. Let us
> populate the new added column with cos(id)
>
> scratch...@orasource.mich.LOCAL> insert into scratchpad.dummy7 (id,
> CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING,
> DERIVED_COL)
>   2  select id, CLUSTERED, SCATTERED, RANDOMISED, RANDOM_STRING, SMALL_VC,
> PADDING, *cos(id)* from randomdata;
>
> 10 rows created.
>
> This should work, unless there is a way of inserting columns directly from
> Spark.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 18 Jun 2021 at 22:14, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Well the challenge is that Spark is best suited to insert a dataframe
>> into the Oracle table, i.e. a bulk insert
>>
>> that  insert into table (column list) values (..) is a single record
>> insert .. Can you try creating a staging table in oracle without
>> get_function() column and do a bulk insert from Spark dataframe to that
>> staging table?
>>
>> HTH
>>
>> Mich
>>
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 18 Jun 2021 at 21:53, Anshul Kala <anshul.k...@gmail.com> wrote:
>>
>>>
>>> Hi Mich,
>>>
>>> Thanks for your reply. Please advise the insert query that I need to
>>> substitute should be like below:
>>>
>>> Insert into table(a,b,c) values(?,get_function_value(?),?)
>>>
>>> In the statement above :
>>>
>>>  ?  : refers to value from dataframe column values
>>> get_function_value : refers to be the function where one of the data
>>> frame column is passed as input
>>>
>>>
>>> Thanks
>>> Anshul
>>>
>>>
>>> Thanks
>>> Anshul
>>>
>>> On Fri, Jun 18, 2021 at 4:29 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> I gather you mean using JDBC to write to the Oracle table?
>>>>
>>>> Spark provides a unified framework to write to any JDBC
>>>> compliant database.
>>>>
>>>> def writeTableWithJDBC(dataFrame, url, tableName, user, password,
>>>> driver, mode):
>>>>     try:
>>>>         dataFrame. \
>>>>             write. \
>>>>             format("jdbc"). \
>>>>             option("url", url). \
>>>>             option("dbtable", tableName). \
>>>>             option("user", user). \
>>>>             option("password", password). \
>>>>             option("driver", driver). \
>>>>             mode(mode). \
>>>>             save()
>>>>     except Exception as e:
>>>>         print(f"""{e}, quitting""")
>>>>         sys.exit(1)
>>>>
>>>> and how to write it
>>>>
>>>>  def loadIntoOracleTable(self, df2):
>>>>         # write to Oracle table, all uppercase not mixed case and
>>>> column names <= 30 characters in version 12.1
>>>>         tableName =
>>>> self.config['OracleVariables']['yearlyAveragePricesAllTable']
>>>>         fullyQualifiedTableName =
>>>> self.config['OracleVariables']['dbschema']+'.'+tableName
>>>>         user = self.config['OracleVariables']['oracle_user']
>>>>         password = self.config['OracleVariables']['oracle_password']
>>>>         driver = self.config['OracleVariables']['oracle_driver']
>>>>         mode = self.config['OracleVariables']['mode']
>>>>
>>>> s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
>>>>         print(f"""created
>>>> {config['OracleVariables']['yearlyAveragePricesAllTable']}""")
>>>>         # read data to ensure all loaded OK
>>>>         fetchsize = self.config['OracleVariables']['fetchsize']
>>>>         read_df =
>>>> s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
>>>>         # check that all rows are there
>>>>         if df2.subtract(read_df).count() == 0:
>>>>             print("Data has been loaded OK to Oracle table")
>>>>         else:
>>>>             print("Data could not be loaded to Oracle table, quitting")
>>>>             sys.exit(1)
>>>>
>>>> in the statement where it says
>>>>
>>>>              option("dbtable", tableName). \
>>>>
>>>> You can replace *tableName* with the equivalent SQL insert statement
>>>>
>>>> You will need JDBC driver for Oracle say ojdbc6.jar in
>>>> $SPARK_HOME/conf/spark-defaults.conf
>>>>
>>>> spark.driver.extraClassPath
>>>>  /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 18 Jun 2021 at 20:49, Anshul Kala <anshul.k...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am using spark to ingest data from file to database Oracle table .
>>>>> For one of the fields , the value to be populated is generated from a
>>>>> function that is written in database .
>>>>>
>>>>> The input to the function is one of the fields of data frame
>>>>>
>>>>> I wanted to use spark.dbc.write to perform the operation, which
>>>>> generates the insert query at back end .
>>>>>
>>>>> For example : It can generate the insert query as :
>>>>>
>>>>> Insert into table values (?,?, getfunctionvalue(?) )
>>>>>
>>>>> Please advise if it is possible in spark and if yes , how can it be
>>>>> done
>>>>>
>>>>> This is little urgent for me . So any help is appreciated
>>>>>
>>>>> Thanks
>>>>> Anshul
>>>>>
>>>>

Reply via email to