I have a Storm Trident Bolt for writing ORC File. The files are created; 
however, they are always zero length. This code eventually causes an OOME. I 
suspect I am missing some sort of flushing action, but don’t see anything like 
that in the api.

My bolt follows. Any thoughts as to what I’m doing wrong or links to reference 
uses of org.apache.hadoop.hive.ql.io.orc.Writer ?


package com.cisco.tinderbox.burner.trident.functions;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
import com.cisco.tinderbox.burner.trident.Topology;
import com.cisco.tinderbox.model.ConnectionEvent;
import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hive.hcatalog.streaming.FlatTableColumn;
import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;

public class OrcSink extends BaseFunction {
    private static final Logger logger = LoggerFactory.getLogger(OrcSink.class);
    private static final CurrentUnixTime currentUnixTime = 
CurrentUnixTime.getInstance();
    private static final long serialVersionUID = 7435558912956446385L;
    private final String dbName;
    private final String tableName;
    private final List<FlatTableColumn<?>> fields;
    private final String hdfsUrl;
    private transient volatile int partition;
    private transient volatile Writer writer;
    private transient volatile Path path;

    public OrcSink(String hdfsUrl, String dbName, String tableName, 
List<FlatTableColumn<?>> fields) {
        this.hdfsUrl = hdfsUrl;
        this.dbName = dbName;
        this.tableName = tableName;
        this.fields = fields;
    }

    @Override
    public void cleanup() {
        closeWriter();
    }

    @Override
    public synchronized void execute(TridentTuple tuple, TridentCollector 
collector) {
        try {
            refreshWriterIfNeeded();
            ConnectionEvent connectionEvent = (ConnectionEvent) 
tuple.getValueByField(Topology.FIELD_CORRELATED);
            writer.addRow(connectionEvent);
        } catch (IOException e) {
            logger.error("could not write to orc", e);
        }
    }

    private void closeWriter() {
        if (writer != null) {
            try {
                writer.close();
            } catch (IOException e) {
                Throwables.propagate(e);
            } finally {
                writer = null;
            }
        }
    }

    private void createWriter() {
        try {
            Configuration fsConf = new Configuration();
            fsConf.set("fs.defaultFS", hdfsUrl);
            FileSystem fs = new RawLocalFileSystem(); //FileSystem.get(fsConf);
            String fileName = System.currentTimeMillis() + "-" + 
UUID.randomUUID().toString() + ".orc";
            path = new Path("/data/diska/orc/" + dbName + "/" + tableName + "/" 
+ partition + "/" + fileName);
            Configuration writerConf = new Configuration();
            ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + 
tableName, fields);
            int stripeSize = 250 * 1024 * 1024;
            int compressBufferSize = 256 * 1024;
            int rowIndexStride = 10000;
            writer = OrcFile.createWriter(fs, path, writerConf, oi, stripeSize, 
SNAPPY, compressBufferSize, rowIndexStride);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private void refreshWriter() {
        partition = currentUnixTime.getQuarterHour();
        closeWriter();
        createWriter();
    }

    private void refreshWriterIfNeeded() {
        if (writer == null || partition != currentUnixTime.getQuarterHour()) {
            refreshWriter();
        }
    }
}


[http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726]

Grant Overby
Software Engineer
Cisco.com<http://www.cisco.com/>
grove...@cisco.com<mailto:grove...@cisco.com>
Mobile: 865 724 4910






[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you 
print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.

Please click 
here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for 
Company Registration Information.




Reply via email to