Yes I am using the orc.writer interface directly. Unless I synchronize the call 
to addRow it still fails.

In good health,
Andrew

Sent from my GPU powered iPhone

On Jun 11, 2013, at 11:00, "Owen O'Malley" 
<omal...@apache.org<mailto:omal...@apache.org>> wrote:

Sorry for dropping this thread for so long. Are you using the orc.Writer 
interface directly or going through the OrcOutputFormat? I forgot that I had 
updated the Writer to synchronize things. It looks like all of the methods in 
Writer are synchronized. I haven't had a chance to investigate this further yet.

-- Owen


On Fri, May 24, 2013 at 1:28 PM, Andrew Psaltis 
<andrew.psal...@webtrends.com<mailto:andrew.psal...@webtrends.com>> wrote:
Here is a snippet from the file header comment the WriterImpl for ORC:

/**
 …………
 * This class is synchronized so that multi-threaded access is ok. In
 * particular, because the MemoryManager is shared between writers, this class
 * assumes that checkMemory may be called from a separate thread.
 */

And then the addRow looks like this:

 public void addRow(Object row) throws IOException {
    synchronized (this) {
      treeWriter.write(row);
      rowsInStripe += 1;
      if (buildIndex) {
        rowsInIndex += 1;

        if (rowsInIndex >= rowIndexStride) {
          createRowIndexEntry();
        }
      }
    }
    memoryManager.addedRow();
  }

Am I missing something here about the synchronized(this) ?  Perhaps I am 
looking in the wrong place.

Thanks,
agp


From: Owen O'Malley <omal...@apache.org<mailto:omal...@apache.org>>
Reply-To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
<user@hive.apache.org<mailto:user@hive.apache.org>>
Date: Friday, May 24, 2013 2:15 PM
To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
<user@hive.apache.org<mailto:user@hive.apache.org>>
Subject: Re: OrcFile writing failing with multiple threads

Currently, ORC writers, like the Java collections API don't lock themselves. 
You should synchronize on the writer before adding a row. I'm open to making 
the writers synchronized.

-- Owen


On Fri, May 24, 2013 at 11:39 AM, Andrew Psaltis 
<andrew.psal...@webtrends.com<mailto:andrew.psal...@webtrends.com>> wrote:
All,
I have a test application that is attempting to add rows to an OrcFile from 
multiple threads, however, every time I do I get exceptions with stack traces 
like the following:

java.lang.IndexOutOfBoundsException: Index 4 is outside of 0..5
at org.apache.hadoop.hive.ql.io.orc.DynamicIntArray.get(DynamicIntArray.java:73)
at 
org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.compareValue(StringRedBlackTree.java:55)
at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:192)
at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:199)
at org.apache.hadoop.hive.ql.io.orc.RedBlackTree.add(RedBlackTree.java:300)
at 
org.apache.hadoop.hive.ql.io.orc.StringRedBlackTree.add(StringRedBlackTree.java:45)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl$StringTreeWriter.write(WriterImpl.java:723)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl$MapTreeWriter.write(WriterImpl.java:1093)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl$StructTreeWriter.write(WriterImpl.java:996)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:1450)
at OrcFileTester$BigRowWriter.run(OrcFileTester.java:129)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)


Below is the source code for my sample app that is heavily based on the 
TestOrcFile test case using BigRow. Is there something I am doing wrong here, 
or is this a legitimate bug in the Orc writing?

Thanks in advance,
Andrew


------------------------- Java app code follows 
---------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class OrcFileTester {

    private Writer writer;
    private LinkedBlockingQueue<BigRow> bigRowQueue = new 
LinkedBlockingQueue<BigRow>();
    public OrcFileTester(){

      try{
        Path workDir = new Path(System.getProperty("test.tmp.dir",
                "target" + File.separator + "test" + File.separator + "tmp"));

        Configuration conf;
        FileSystem fs;
        Path testFilePath;

        conf = new Configuration();
        fs = FileSystem.getLocal(conf);
        testFilePath = new Path(workDir, "TestOrcFile.OrcFileTester.orc");
        fs.delete(testFilePath, false);


        ObjectInspector inspector = 
ObjectInspectorFactory.getReflectionObjectInspector
                (BigRow.class, 
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
        writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
                100000, CompressionKind.ZLIB, 10000, 10000);

        final ExecutorService bigRowWorkerPool = 
Executors.newFixedThreadPool(10);

        //Changing this to more than 1 causes exceptions when writing rows.
        for (int i = 0; i < 1; i++) {
            bigRowWorkerPool.submit(new BigRowWriter());
        }
          for(int i =0; i < 100; i++){
              if(0 == i % 2){
                 bigRowQueue.put(new BigRow(false, (byte) 1, (short) 1024, 
65536,
                         Long.MAX_VALUE, (float) 1.0, -15.0, bytes(0,1,2,3,4), 
"hi",map("hey","orc")));
              } else{
                   bigRowQueue.put(new BigRow(false, null, (short) 1024, 65536,
                           Long.MAX_VALUE, (float) 1.0, -15.0, 
bytes(0,1,2,3,4), "hi",map("hey","orc")));
              }
          }

          while (!bigRowQueue.isEmpty()) {
              Thread.sleep(2000);
          }
          bigRowWorkerPool.shutdownNow();
      }catch(Exception ex){
          ex.printStackTrace();
      }
    }
    public void WriteBigRow(){

    }

    private static Map<Text, Text> map(String... items)  {
        Map<Text, Text> result = new HashMap<Text, Text>();
        for(String i: items) {
            result.put(new Text(i), new Text(i));
        }
        return result;
    }
    private static BytesWritable bytes(int... items) {
        BytesWritable result = new BytesWritable();
        result.setSize(items.length);
        for(int i=0; i < items.length; ++i) {
            result.getBytes()[i] = (byte) items[i];
        }
        return result;
    }

    public static class BigRow {
        Boolean boolean1;
        Byte byte1;
        Short short1;
        Integer int1;
        Long long1;
        Float float1;
        Double double1;
        BytesWritable bytes1;
        Text string1;
        Map<Text, Text> map = new HashMap<Text, Text>();

        BigRow(Boolean b1, Byte b2, Short s1, Integer i1, Long l1, Float f1,
               Double d1,
               BytesWritable b3, String s2, Map<Text, Text> m2) {
            this.boolean1 = b1;
            this.byte1 = b2;
            this.short1 = s1;
            this.int1 = i1;
            this.long1 = l1;
            this.float1 = f1;
            this.double1 = d1;
            this.bytes1 = b3;
            if (s2 == null) {
                this.string1 = null;
            } else {
                this.string1 = new Text(s2);
            }
            this.map = m2;
        }
    }



    class BigRowWriter implements Runnable{

        @Override
        public void run() {
                try {
                    BigRow bigRow = bigRowQueue.take();
                    writer.addRow(bigRow);
                } catch (Exception e) {
                    e.printStackTrace();
                }

        }
    }

    public static void main(String[] args) throws IOException {
        OrcFileTester  orcFileTester = new OrcFileTester();
        orcFileTester.WriteBigRow();
    }



}

-----------------------------end of Java source ------------------------------

----------------------------- pom file start ----------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>

    <groupId>ORCTester</groupId>
    <artifactId>ORCTester</artifactId>
    <version>1.0-SNAPSHOT</version>
  <dependencies>
      <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-exec</artifactId>
          <version>0.11.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-core</artifactId>
          <version>0.20.2</version>
      </dependency>
  </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1.1</version>
                <executions>
                    <execution>
                        <phase>test</phase>
                        <goals>
                            <goal>java</goal>
                        </goals>
                        <configuration>
                            <mainClass>OrcFileTester</mainClass>
                            <arguments/>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
----------------------------- pom file end ----------------------------------


Reply via email to