Currently, ORC writers, like the Java collections API don't lock themselves. You should synchronize on the writer before adding a row. I'm open to making the writers synchronized.
-- Owen On Fri, May 24, 2013 at 11:39 AM, Andrew Psaltis < andrew.psal...@webtrends.com> wrote: > All, > I have a test application that is attempting to add rows to an OrcFile > from multiple threads, however, every time I do I get exceptions with stack > traces like the following: > > java.lang.IndexOutOfBoundsException: Index 4 is outside of 0..5 > at > org.apache.hadoop.hive.ql.io.orc.DynamicIntArray.get(DynamicIntArray.java:73) > at > org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.compareValue(StringRedBlackTree.java:55) > at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:192) > at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:199) > at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:300) > at > org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.add(StringRedBlackTree.java:45) > at > org.apache.hadoop.hive.ql.io.orc.WriterImpl$StringTreeWriter.write(WriterImpl.java:723) > at > org.apache.hadoop.hive.ql.io.orc.WriterImpl$MapTreeWriter.write(WriterImpl.java:1093) > at > org.apache.hadoop.hive.ql.io.orc.WriterImpl$StructTreeWriter.write(WriterImpl.java:996) > at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:1450) > at OrcFileTester$BigRowWriter.run(OrcFileTester.java:129) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > > > Below is the source code for my sample app that is heavily based on the > TestOrcFile test case using BigRow. Is there something I am doing wrong > here, or is this a legitimate bug in the Orc writing? > > Thanks in advance, > Andrew > > > ------------------------- Java app code follows > --------------------------------- > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hive.ql.io.orc.CompressionKind; > import org.apache.hadoop.hive.ql.io.orc.OrcFile; > import org.apache.hadoop.hive.ql.io.orc.Writer; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; > import > org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.Text; > > import java.io.File; > import java.io.IOException; > import java.util.HashMap; > import java.util.Map; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.LinkedBlockingQueue; > > public class OrcFileTester { > > private Writer writer; > private LinkedBlockingQueue<BigRow> bigRowQueue = new > LinkedBlockingQueue<BigRow>(); > public OrcFileTester(){ > > try{ > Path workDir = new Path(System.getProperty("test.tmp.dir", > "target" + File.separator + "test" + File.separator + > "tmp")); > > Configuration conf; > FileSystem fs; > Path testFilePath; > > conf = new Configuration(); > fs = FileSystem.getLocal(conf); > testFilePath = new Path(workDir, "TestOrcFile.OrcFileTester.orc"); > fs.delete(testFilePath, false); > > > ObjectInspector inspector = > ObjectInspectorFactory.getReflectionObjectInspector > (BigRow.class, > ObjectInspectorFactory.ObjectInspectorOptions.JAVA); > writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, > 100000, CompressionKind.ZLIB, 10000, 10000); > > final ExecutorService bigRowWorkerPool = > Executors.newFixedThreadPool(10); > > //Changing this to more than 1 causes exceptions when writing > rows. > for (int i = 0; i < 1; i++) { > bigRowWorkerPool.submit(new BigRowWriter()); > } > for(int i =0; i < 100; i++){ > if(0 == i % 2){ > bigRowQueue.put(new BigRow(false, (byte) 1, (short) 1024, > 65536, > Long.MAX_VALUE, (float) 1.0, -15.0, > bytes(0,1,2,3,4), "hi",map("hey","orc"))); > } else{ > bigRowQueue.put(new BigRow(false, null, (short) 1024, > 65536, > Long.MAX_VALUE, (float) 1.0, -15.0, > bytes(0,1,2,3,4), "hi",map("hey","orc"))); > } > } > > while (!bigRowQueue.isEmpty()) { > Thread.sleep(2000); > } > bigRowWorkerPool.shutdownNow(); > }catch(Exception ex){ > ex.printStackTrace(); > } > } > public void WriteBigRow(){ > > } > > private static Map<Text, Text> map(String... items) { > Map<Text, Text> result = new HashMap<Text, Text>(); > for(String i: items) { > result.put(new Text(i), new Text(i)); > } > return result; > } > private static BytesWritable bytes(int... items) { > BytesWritable result = new BytesWritable(); > result.setSize(items.length); > for(int i=0; i < items.length; ++i) { > result.getBytes()[i] = (byte) items[i]; > } > return result; > } > > public static class BigRow { > Boolean boolean1; > Byte byte1; > Short short1; > Integer int1; > Long long1; > Float float1; > Double double1; > BytesWritable bytes1; > Text string1; > Map<Text, Text> map = new HashMap<Text, Text>(); > > BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float > f1, > Double d1, > BytesWritable b3, String s2, Map<Text, Text> m2) { > this.boolean1 = b1; > this.byte1 = b2; > this.short1 = s1; > this.int1 = i1; > this.long1 = l1; > this.float1 = f1; > this.double1 = d1; > this.bytes1 = b3; > if (s2 == null) { > this.string1 = null; > } else { > this.string1 = new Text(s2); > } > this.map = m2; > } > } > > > > class BigRowWriter implements Runnable{ > > @Override > public void run() { > try { > BigRow bigRow = bigRowQueue.take(); > writer.addRow(bigRow); > } catch (Exception e) { > e.printStackTrace(); > } > > } > } > > public static void main(String[] args) throws IOException { > OrcFileTester orcFileTester = new OrcFileTester(); > orcFileTester.WriteBigRow(); > } > > > > } > > -----------------------------end of Java source > ------------------------------ > > ----------------------------- pom file start > ---------------------------------- > <?xml version="1.0" encoding="UTF-8"?> > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>ORCTester</groupId> > <artifactId>ORCTester</artifactId> > <version>1.0-SNAPSHOT</version> > <dependencies> > <dependency> > <groupId>org.apache.hive</groupId> > <artifactId>hive-exec</artifactId> > <version>0.11.0</version> > </dependency> > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-core</artifactId> > <version>0.20.2</version> > </dependency> > </dependencies> > <build> > <plugins> > <plugin> > <groupId>org.codehaus.mojo</groupId> > <artifactId>exec-maven-plugin</artifactId> > <version>1.1.1</version> > <executions> > <execution> > <phase>test</phase> > <goals> > <goal>java</goal> > </goals> > <configuration> > <mainClass>OrcFileTester</mainClass> > <arguments/> > </configuration> > </execution> > </executions> > </plugin> > </plugins> > </build> > </project> > ----------------------------- pom file end > ---------------------------------- >