Here is a snippet from the file header comment the WriterImpl for ORC: /** ………… * This class is synchronized so that multi-threaded access is ok. In * particular, because the MemoryManager is shared between writers, this class * assumes that checkMemory may be called from a separate thread. */
And then the addRow looks like this: public void addRow(Object row) throws IOException { synchronized (this) { treeWriter.write(row); rowsInStripe += 1; if (buildIndex) { rowsInIndex += 1; if (rowsInIndex >= rowIndexStride) { createRowIndexEntry(); } } } memoryManager.addedRow(); } Am I missing something here about the synchronized(this) ? Perhaps I am looking in the wrong place. Thanks, agp From: Owen O'Malley <omal...@apache.org<mailto:omal...@apache.org>> Reply-To: "user@hive.apache.org<mailto:user@hive.apache.org>" <user@hive.apache.org<mailto:user@hive.apache.org>> Date: Friday, May 24, 2013 2:15 PM To: "user@hive.apache.org<mailto:user@hive.apache.org>" <user@hive.apache.org<mailto:user@hive.apache.org>> Subject: Re: OrcFile writing failing with multiple threads Currently, ORC writers, like the Java collections API don't lock themselves. You should synchronize on the writer before adding a row. I'm open to making the writers synchronized. -- Owen On Fri, May 24, 2013 at 11:39 AM, Andrew Psaltis <andrew.psal...@webtrends.com<mailto:andrew.psal...@webtrends.com>> wrote: All, I have a test application that is attempting to add rows to an OrcFile from multiple threads, however, every time I do I get exceptions with stack traces like the following: java.lang.IndexOutOfBoundsException: Index 4 is outside of 0..5 at org.apache.hadoop.hive.ql.io.orc.DynamicIntArray.get(DynamicIntArray.java:73) at org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.compareValue(StringRedBlackTree.java:55) at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:192) at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:199) at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:300) at org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.add(StringRedBlackTree.java:45) at org.apache.hadoop.hive.ql.io.orc.WriterImpl$StringTreeWriter.write(WriterImpl.java:723) at org.apache.hadoop.hive.ql.io.orc.WriterImpl$MapTreeWriter.write(WriterImpl.java:1093) at org.apache.hadoop.hive.ql.io.orc.WriterImpl$StructTreeWriter.write(WriterImpl.java:996) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:1450) at OrcFileTester$BigRowWriter.run(OrcFileTester.java:129) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Below is the source code for my sample app that is heavily based on the TestOrcFile test case using BigRow. Is there something I am doing wrong here, or is this a legitimate bug in the Orc writing? Thanks in advance, Andrew ------------------------- Java app code follows --------------------------------- import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.CompressionKind; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class OrcFileTester { private Writer writer; private LinkedBlockingQueue<BigRow> bigRowQueue = new LinkedBlockingQueue<BigRow>(); public OrcFileTester(){ try{ Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + File.separator + "tmp")); Configuration conf; FileSystem fs; Path testFilePath; conf = new Configuration(); fs = FileSystem.getLocal(conf); testFilePath = new Path(workDir, "TestOrcFile.OrcFileTester.orc"); fs.delete(testFilePath, false); ObjectInspector inspector = ObjectInspectorFactory.getReflectionObjectInspector (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); writer = OrcFile.createWriter(fs, testFilePath, conf, inspector, 100000, CompressionKind.ZLIB, 10000, 10000); final ExecutorService bigRowWorkerPool = Executors.newFixedThreadPool(10); //Changing this to more than 1 causes exceptions when writing rows. for (int i = 0; i < 1; i++) { bigRowWorkerPool.submit(new BigRowWriter()); } for(int i =0; i < 100; i++){ if(0 == i % 2){ bigRowQueue.put(new BigRow(false, (byte) 1, (short) 1024, 65536, Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0,1,2,3,4), "hi",map("hey","orc"))); } else{ bigRowQueue.put(new BigRow(false, null, (short) 1024, 65536, Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0,1,2,3,4), "hi",map("hey","orc"))); } } while (!bigRowQueue.isEmpty()) { Thread.sleep(2000); } bigRowWorkerPool.shutdownNow(); }catch(Exception ex){ ex.printStackTrace(); } } public void WriteBigRow(){ } private static Map<Text, Text> map(String... items) { Map<Text, Text> result = new HashMap<Text, Text>(); for(String i: items) { result.put(new Text(i), new Text(i)); } return result; } private static BytesWritable bytes(int... items) { BytesWritable result = new BytesWritable(); result.setSize(items.length); for(int i=0; i < items.length; ++i) { result.getBytes()[i] = (byte) items[i]; } return result; } public static class BigRow { Boolean boolean1; Byte byte1; Short short1; Integer int1; Long long1; Float float1; Double double1; BytesWritable bytes1; Text string1; Map<Text, Text> map = new HashMap<Text, Text>(); BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float f1, Double d1, BytesWritable b3, String s2, Map<Text, Text> m2) { this.boolean1 = b1; this.byte1 = b2; this.short1 = s1; this.int1 = i1; this.long1 = l1; this.float1 = f1; this.double1 = d1; this.bytes1 = b3; if (s2 == null) { this.string1 = null; } else { this.string1 = new Text(s2); } this.map = m2; } } class BigRowWriter implements Runnable{ @Override public void run() { try { BigRow bigRow = bigRowQueue.take(); writer.addRow(bigRow); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { OrcFileTester orcFileTester = new OrcFileTester(); orcFileTester.WriteBigRow(); } } -----------------------------end of Java source ------------------------------ ----------------------------- pom file start ---------------------------------- <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ORCTester</groupId> <artifactId>ORCTester</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>0.11.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.1.1</version> <executions> <execution> <phase>test</phase> <goals> <goal>java</goal> </goals> <configuration> <mainClass>OrcFileTester</mainClass> <arguments/> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> ----------------------------- pom file end ----------------------------------