Currently, ORC writers, like the Java collections API don't lock
themselves. You should synchronize on the writer before adding a row. I'm
open to making the writers synchronized.

-- Owen


On Fri, May 24, 2013 at 11:39 AM, Andrew Psaltis <
andrew.psal...@webtrends.com> wrote:

>  All,
> I have a test application that is attempting to add rows to an OrcFile
> from multiple threads, however, every time I do I get exceptions with stack
> traces like the following:
>
>  java.lang.IndexOutOfBoundsException: Index 4 is outside of 0..5
> at
> org.apache.hadoop.hive.ql.io.orc.DynamicIntArray.get(DynamicIntArray.java:73)
> at
> org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.compareValue(StringRedBlackTree.java:55)
> at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:192)
> at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:199)
> at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:300)
> at
> org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.add(StringRedBlackTree.java:45)
> at
> org.apache.hadoop.hive.ql.io.orc.WriterImpl$StringTreeWriter.write(WriterImpl.java:723)
> at
> org.apache.hadoop.hive.ql.io.orc.WriterImpl$MapTreeWriter.write(WriterImpl.java:1093)
> at
> org.apache.hadoop.hive.ql.io.orc.WriterImpl$StructTreeWriter.write(WriterImpl.java:996)
> at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:1450)
> at OrcFileTester$BigRowWriter.run(OrcFileTester.java:129)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
>
>
>  Below is the source code for my sample app that is heavily based on the
> TestOrcFile test case using BigRow. Is there something I am doing wrong
> here, or is this a legitimate bug in the Orc writing?
>
>  Thanks in advance,
> Andrew
>
>
>  ------------------------- Java app code follows
> ---------------------------------
>  import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
> import org.apache.hadoop.hive.ql.io.orc.Writer;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
>
>  import java.io.File;
> import java.io.IOException;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.LinkedBlockingQueue;
>
>  public class OrcFileTester {
>
>      private Writer writer;
>     private LinkedBlockingQueue<BigRow> bigRowQueue = new
> LinkedBlockingQueue<BigRow>();
>     public OrcFileTester(){
>
>        try{
>         Path workDir = new Path(System.getProperty("test.tmp.dir",
>                 "target" + File.separator + "test" + File.separator +
> "tmp"));
>
>          Configuration conf;
>         FileSystem fs;
>         Path testFilePath;
>
>          conf = new Configuration();
>         fs = FileSystem.getLocal(conf);
>         testFilePath = new Path(workDir, "TestOrcFile.OrcFileTester.orc");
>         fs.delete(testFilePath, false);
>
>
>          ObjectInspector inspector =
> ObjectInspectorFactory.getReflectionObjectInspector
>                 (BigRow.class,
> ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
>         writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
>                 100000, CompressionKind.ZLIB, 10000, 10000);
>
>          final ExecutorService bigRowWorkerPool =
> Executors.newFixedThreadPool(10);
>
>          //Changing this to more than 1 causes exceptions when writing
> rows.
>         for (int i = 0; i < 1; i++) {
>             bigRowWorkerPool.submit(new BigRowWriter());
>         }
>           for(int i =0; i < 100; i++){
>               if(0 == i % 2){
>                  bigRowQueue.put(new BigRow(false, (byte) 1, (short) 1024,
> 65536,
>                          Long.MAX_VALUE, (float) 1.0, -15.0,
> bytes(0,1,2,3,4), "hi",map("hey","orc")));
>               } else{
>                    bigRowQueue.put(new BigRow(false, null, (short) 1024,
> 65536,
>                            Long.MAX_VALUE, (float) 1.0, -15.0,
> bytes(0,1,2,3,4), "hi",map("hey","orc")));
>               }
>           }
>
>            while (!bigRowQueue.isEmpty()) {
>               Thread.sleep(2000);
>           }
>           bigRowWorkerPool.shutdownNow();
>       }catch(Exception ex){
>           ex.printStackTrace();
>       }
>     }
>     public void WriteBigRow(){
>
>      }
>
>      private static Map<Text, Text> map(String... items)  {
>         Map<Text, Text> result = new HashMap<Text, Text>();
>         for(String i: items) {
>             result.put(new Text(i), new Text(i));
>         }
>         return result;
>     }
>     private static BytesWritable bytes(int... items) {
>         BytesWritable result = new BytesWritable();
>         result.setSize(items.length);
>         for(int i=0; i < items.length; ++i) {
>             result.getBytes()[i] = (byte) items[i];
>         }
>         return result;
>     }
>
>      public static class BigRow {
>         Boolean boolean1;
>         Byte byte1;
>         Short short1;
>         Integer int1;
>         Long long1;
>         Float float1;
>         Double double1;
>         BytesWritable bytes1;
>         Text string1;
>         Map<Text, Text> map = new HashMap<Text, Text>();
>
>          BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float
> f1,
>                Double d1,
>                BytesWritable b3, String s2, Map<Text, Text> m2) {
>             this.boolean1 = b1;
>             this.byte1 = b2;
>             this.short1 = s1;
>             this.int1 = i1;
>             this.long1 = l1;
>             this.float1 = f1;
>             this.double1 = d1;
>             this.bytes1 = b3;
>             if (s2 == null) {
>                 this.string1 = null;
>             } else {
>                 this.string1 = new Text(s2);
>             }
>             this.map = m2;
>         }
>     }
>
>
>
>      class BigRowWriter implements Runnable{
>
>          @Override
>         public void run() {
>                 try {
>                     BigRow bigRow = bigRowQueue.take();
>                     writer.addRow(bigRow);
>                 } catch (Exception e) {
>                     e.printStackTrace();
>                 }
>
>          }
>     }
>
>      public static void main(String[] args) throws IOException {
>         OrcFileTester  orcFileTester = new OrcFileTester();
>         orcFileTester.WriteBigRow();
>     }
>
>
>
>  }
>
>  -----------------------------end of Java source
> ------------------------------
>
>  ----------------------------- pom file start
> ----------------------------------
>  <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>
>      <groupId>ORCTester</groupId>
>     <artifactId>ORCTester</artifactId>
>     <version>1.0-SNAPSHOT</version>
>   <dependencies>
>       <dependency>
>           <groupId>org.apache.hive</groupId>
>           <artifactId>hive-exec</artifactId>
>           <version>0.11.0</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.hadoop</groupId>
>           <artifactId>hadoop-core</artifactId>
>           <version>0.20.2</version>
>       </dependency>
>   </dependencies>
>     <build>
>         <plugins>
>             <plugin>
>                 <groupId>org.codehaus.mojo</groupId>
>                 <artifactId>exec-maven-plugin</artifactId>
>                 <version>1.1.1</version>
>                 <executions>
>                     <execution>
>                         <phase>test</phase>
>                         <goals>
>                             <goal>java</goal>
>                         </goals>
>                         <configuration>
>                             <mainClass>OrcFileTester</mainClass>
>                             <arguments/>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
>         </plugins>
>     </build>
> </project>
>    ----------------------------- pom file end
> ----------------------------------
>

Reply via email to