Thanks Brian. Sorry about getting back so late. Your input made it work. Now, I can pull data out of Oracle as well.
Thanks Amandeep Amandeep Khurana Computer Science Graduate Student University of California, Santa Cruz On Thu, Feb 12, 2009 at 9:05 AM, Brian MacKay <[email protected]>wrote: > Amandeep, > > I spoke w/ one of our Oracle DBA's and he suggested changing the query > statement as follows: > > MySql Stmt: > select * from <<TABLE >> limit <splitlength> offset <splitstart>" > --------------------------------------------------------------- > Oracle Stmt: > select * > from (select a.*,rownum rno > from (your_query_here must contain order by) a > where rownum <= splitstart + splitlength) > where rno >= splitstart; > > This can be put into a function, but would require a type as well. > ----------------------------------------------------------------- > > If you edit org.apache.hadoop.mapred.lib.db.DBInputFormat, getSelectQuery, > it should work in Oracle > > protected String getSelectQuery() { > > ... edit to include check for driver and create Oracle Stmt > > return query.toString(); > } > > > Brian > > ================================================================== > >> On Feb 5, 2009, at 11:37 AM, Stefan Podkowinski wrote: > >> > >> > >>> The 0.19 DBInputFormat class implementation is IMHO only suitable for > >>> very simple queries working on only few datasets. Thats due to the > >>> fact that it tries to create splits from the query by > >>> 1) getting a count of all rows using the specified count query (huge > >>> performance impact on large tables) > >>> 2) creating splits by issuing an individual query for each split with > >>> a "limit" and "offset" parameter appended to the input sql query > >>> > >>> Effectively your input query "select * from orders" would become > >>> "select * from orders limit <splitlength> offset <splitstart>" and > >>> executed until count has been reached. I guess this is not working sql > >>> syntax for oracle. > >>> > >>> Stefan > >>> > >>> > >>> 2009/2/4 Amandeep Khurana <[email protected]>: > >>> > >>>> Adding a semicolon gives me the error "ORA-00911: Invalid character" > >>>> > >>>> Amandeep > >>>> > >>>> > >>>> Amandeep Khurana > >>>> Computer Science Graduate Student > >>>> University of California, Santa Cruz > >>>> > >>>> > >>>> On Wed, Feb 4, 2009 at 6:46 AM, Rasit OZDAS <[email protected]> > wrote: > >>>> > >>>> > >>>>> Amandeep, > >>>>> "SQL command not properly ended" > >>>>> I get this error whenever I forget the semicolon at the end. > >>>>> I know, it doesn't make sense, but I recommend giving it a try > >>>>> > >>>>> Rasit > >>>>> > >>>>> 2009/2/4 Amandeep Khurana <[email protected]>: > >>>>> > >>>>>> The same query is working if I write a simple JDBC client and query > the > >>>>>> database. So, I'm probably doing something wrong in the connection > >>>>>> > >>>>> settings. > >>>>> > >>>>>> But the error looks to be on the query side more than the connection > >>>>>> > >>>>> side. > >>>>> > >>>>>> Amandeep > >>>>>> > >>>>>> > >>>>>> Amandeep Khurana > >>>>>> Computer Science Graduate Student > >>>>>> University of California, Santa Cruz > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 3, 2009 at 7:25 PM, Amandeep Khurana <[email protected]> > >>>>>> > >>>>> wrote: > >>>>> > >>>>>>> Thanks Kevin > >>>>>>> > >>>>>>> I couldnt get it work. Here's the error I get: > >>>>>>> > >>>>>>> bin/hadoop jar ~/dbload.jar LoadTable1 > >>>>>>> 09/02/03 19:21:17 INFO jvm.JvmMetrics: Initializing JVM Metrics > with > >>>>>>> processName=JobTracker, sessionId= > >>>>>>> 09/02/03 19:21:20 INFO mapred.JobClient: Running job: > job_local_0001 > >>>>>>> 09/02/03 19:21:21 INFO mapred.JobClient: map 0% reduce 0% > >>>>>>> 09/02/03 19:21:22 INFO mapred.MapTask: numReduceTasks: 0 > >>>>>>> 09/02/03 19:21:24 WARN mapred.LocalJobRunner: job_local_0001 > >>>>>>> java.io.IOException: ORA-00933: SQL command not properly ended > >>>>>>> > >>>>>>> at > >>>>>>> > >>>>>>> > >>>>> > org.apache.hadoop.mapred.lib.db.DBInputFormat.getRecordReader(DBInputFormat.java:289) > >>>>> > >>>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:321) > >>>>>>> at > >>>>>>> > >>>>>>> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:138) > >>>>>>> java.io.IOException: Job failed! > >>>>>>> at > >>>>>>> > >>>>> org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1217) > >>>>> > >>>>>>> at LoadTable1.run(LoadTable1.java:130) > >>>>>>> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) > >>>>>>> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) > >>>>>>> at LoadTable1.main(LoadTable1.java:107) > >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > >>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > >>>>>>> > >>>>> Source) > >>>>> > >>>>>>> at java.lang.reflect.Method.invoke(Unknown Source) > >>>>>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:165) > >>>>>>> at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54) > >>>>>>> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) > >>>>>>> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) > >>>>>>> at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68) > >>>>>>> > >>>>>>> Exception closing file > >>>>>>> > >>>>>>> > >>>>> > /user/amkhuran/contract_table/_temporary/_attempt_local_0001_m_000000_0/part-00000 > >>>>> > >>>>>>> java.io.IOException: Filesystem closed > >>>>>>> at > >>>>>>> > >>>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:198) > >>>>> > >>>>>>> at > >>>>>>> > >>>>> org.apache.hadoop.hdfs.DFSClient.access$600(DFSClient.java:65) > >>>>> > >>>>>>> at > >>>>>>> > >>>>>>> > >>>>> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3084) > >>>>> > >>>>>>> at > >>>>>>> > >>>>>>> > >>>>> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3053) > >>>>> > >>>>>>> at > >>>>>>> > >>>>>>> > org.apache.hadoop.hdfs.DFSClient$LeaseChecker.close(DFSClient.java:942) > >>>>>>> at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java:210) > >>>>>>> at > >>>>>>> > >>>>>>> > >>>>> > org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:243) > >>>>> > >>>>>>> at > >>>>>>> > org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:1413) > >>>>>>> at > org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:236) > >>>>>>> at > >>>>>>> > >>>>>>> > org.apache.hadoop.fs.FileSystem$ClientFinalizer.run(FileSystem.java:221) > >>>>>>> > >>>>>>> > >>>>>>> Here's my code: > >>>>>>> > >>>>>>> public class LoadTable1 extends Configured implements Tool { > >>>>>>> > >>>>>>> // data destination on hdfs > >>>>>>> private static final String CONTRACT_OUTPUT_PATH = > >>>>>>> > >>>>> "contract_table"; > >>>>> > >>>>>>> // The JDBC connection URL and driver implementation class > >>>>>>> > >>>>>>> private static final String CONNECT_URL = "jdbc:oracle:thin:@dbhost > >>>>>>> :1521:PSEDEV"; > >>>>>>> private static final String DB_USER = "user"; > >>>>>>> private static final String DB_PWD = "pass"; > >>>>>>> private static final String DATABASE_DRIVER_CLASS = > >>>>>>> "oracle.jdbc.driver.OracleDriver"; > >>>>>>> > >>>>>>> private static final String CONTRACT_INPUT_TABLE = > >>>>>>> "OSE_EPR_CONTRACT"; > >>>>>>> > >>>>>>> private static final String [] CONTRACT_INPUT_TABLE_FIELDS = { > >>>>>>> "PORTFOLIO_NUMBER", "CONTRACT_NUMBER"}; > >>>>>>> > >>>>>>> private static final String ORDER_CONTRACT_BY_COL = > >>>>>>> "CONTRACT_NUMBER"; > >>>>>>> > >>>>>>> > >>>>>>> static class ose_epr_contract implements Writable, DBWritable { > >>>>>>> > >>>>>>> > >>>>>>> String CONTRACT_NUMBER; > >>>>>>> > >>>>>>> > >>>>>>> public void readFields(DataInput in) throws IOException { > >>>>>>> > >>>>>>> this.CONTRACT_NUMBER = Text.readString(in); > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> public void write(DataOutput out) throws IOException { > >>>>>>> > >>>>>>> Text.writeString(out, this.CONTRACT_NUMBER); > >>>>>>> > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> public void readFields(ResultSet in_set) throws SQLException > { > >>>>>>> > >>>>>>> this.CONTRACT_NUMBER = in_set.getString(1); > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> public void write(PreparedStatement prep_st) throws > SQLException > >>>>>>> > >>>>> { > >>>>> > >>>>>>> // TODO Auto-generated method stub > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> public static class LoadMapper extends MapReduceBase > >>>>>>> implements Mapper<LongWritable, > >>>>>>> ose_epr_contract, Text, NullWritable> { > >>>>>>> private static final char FIELD_SEPARATOR = 1; > >>>>>>> > >>>>>>> public void map(LongWritable arg0, ose_epr_contract arg1, > >>>>>>> OutputCollector<Text, NullWritable> arg2, Reporter > arg3) > >>>>>>> throws IOException { > >>>>>>> > >>>>>>> StringBuilder sb = new StringBuilder(); > >>>>>>> ; > >>>>>>> sb.append(arg1.CONTRACT_NUMBER); > >>>>>>> > >>>>>>> > >>>>>>> arg2.collect(new Text (sb.toString()), > NullWritable.get()); > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> public static void main(String[] args) throws Exception { > >>>>>>> Class.forName("oracle.jdbc.driver.OracleDriver"); > >>>>>>> int exit = ToolRunner.run(new LoadTable1(), args); > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> public int run(String[] arg0) throws Exception { > >>>>>>> JobConf conf = new JobConf(getConf(), LoadTable1.class); > >>>>>>> > >>>>>>> conf.setInputFormat(DBInputFormat.class); > >>>>>>> DBConfiguration.configureDB(conf, DATABASE_DRIVER_CLASS, > >>>>>>> CONNECT_URL, DB_USER, DB_PWD); > >>>>>>> > >>>>>>> DBInputFormat.setInput(conf, ose_epr_contract.class, > >>>>>>> "select CONTRACT_NUMBER from OSE_EPR_CONTRACT", > >>>>>>> "select COUNT(CONTRACT_NUMBER) from > OSE_EPR_CONTRACT"); > >>>>>>> FileOutputFormat.setOutputPath(conf, new > >>>>>>> Path(CONTRACT_OUTPUT_PATH)); > >>>>>>> > >>>>>>> conf.setMapperClass(LoadMapper.class); > >>>>>>> conf.setNumReduceTasks(0); > >>>>>>> > >>>>>>> conf.setOutputKeyClass(Text.class); > >>>>>>> conf.setOutputValueClass(NullWritable.class); > >>>>>>> > >>>>>>> JobClient.runJob(conf); > >>>>>>> > >>>>>>> return 0; > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> -Amandeep > >>>>>>> > >>>>>>> Amandeep Khurana > >>>>>>> Computer Science Graduate Student > >>>>>>> University of California, Santa Cruz > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Feb 3, 2009 at 6:51 PM, Kevin Peterson < > [email protected] > >>>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>>> On Tue, Feb 3, 2009 at 5:49 PM, Amandeep Khurana < > [email protected]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>> > >>>>>>>>> In the setInput(...) function in DBInputFormat, there are two > sets > >>>>>>>>> of > >>>>>>>>> arguments that one can use. > >>>>>>>>> > >>>>>>>>> 1. public static void *setInput*(JobConf > >>>>>>>>> > >>>>>>>>> a) In this, do we necessarily have to give all the fieldNames > (which > >>>>>>>>> > >>>>> are > >>>>> > >>>>>>>>> the > >>>>>>>>> column names right?) that the table has, or do we need to specify > >>>>>>>>> > >>>>> only > >>>>> > >>>>>>>> the > >>>>>>>> > >>>>>>>>> ones that we want to extract? > >>>>>>>>> > >>>>>>>> You may specify only those columns that you are interested in. > >>>>>>>> > >>>>>>>> b) Do we have to have a orderBy or not necessarily? Does this > relate > >>>>>>>> to > >>>>>>>> the > >>>>>>>> > >>>>>>>>> primary key in the table in any ways? > >>>>>>>>> > >>>>>>>> Conditions and order by are not necessary. > >>>>>>>> > >>>>>>>> a) Is there any restriction on the kind of queries that this > function > >>>>>>>> > >>>>>>>>> can take in the inputQuery string? > >>>>>>>>> > >>>>>>>> I don't think so, but I don't use this method -- I just use the > >>>>>>>> > >>>>> fieldNames > >>>>> > >>>>>>>> and tableName method. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> I am facing issues in getting this to work with an Oracle > database > >>>>>>>>> > >>>>> and > >>>>> > >>>>>>>>> have no idea of how to debug it (an email sent earlier). > >>>>>>>>> Can anyone give me some inputs on this please? > >>>>>>>>> > >>>>>>>> Create a new table that has one column, put about five entries > into > >>>>>>>> > >>>>> that > >>>>> > >>>>>>>> table, then try to get a map job working that outputs the values > to a > >>>>>>>> > >>>>> text > >>>>> > >>>>>>>> file. If that doesn't work, post your code and errors. > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> -- > >>>>> M. Raşit ÖZDAŞ > >>>>> > >>>>> > >> > > > > > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > > The information transmitted is intended only for the person or entity to > which it is addressed and may contain confidential and/or privileged > material. Any review, retransmission, dissemination or other use of, or > taking of any action in reliance upon, this information by persons or > entities other than the intended recipient is prohibited. If you received > this message in error, please contact the sender and delete the material > from any computer. > > >
