I have a Storm Trident Bolt for writing ORC File. The files are created; however, they are always zero length. This code eventually causes an OOME. I suspect I am missing some sort of flushing action, but don’t see anything like that in the api.
My bolt follows. Any thoughts as to what I’m doing wrong or links to reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ? package com.cisco.tinderbox.burner.trident.functions; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; import com.cisco.tinderbox.burner.io.system.CurrentUnixTime; import com.cisco.tinderbox.burner.trident.Topology; import com.cisco.tinderbox.model.ConnectionEvent; import com.google.common.base.Throwables; import java.io.IOException; import java.util.List; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hive.hcatalog.streaming.FlatTableColumn; import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*; public class OrcSink extends BaseFunction { private static final Logger logger = LoggerFactory.getLogger(OrcSink.class); private static final CurrentUnixTime currentUnixTime = CurrentUnixTime.getInstance(); private static final long serialVersionUID = 7435558912956446385L; private final String dbName; private final String tableName; private final List<FlatTableColumn<?>> fields; private final String hdfsUrl; private transient volatile int partition; private transient volatile Writer writer; private transient volatile Path path; public OrcSink(String hdfsUrl, String dbName, String tableName, List<FlatTableColumn<?>> fields) { this.hdfsUrl = hdfsUrl; this.dbName = dbName; this.tableName = tableName; this.fields = fields; } @Override public void cleanup() { closeWriter(); } @Override public synchronized void execute(TridentTuple tuple, TridentCollector collector) { try { refreshWriterIfNeeded(); ConnectionEvent connectionEvent = (ConnectionEvent) tuple.getValueByField(Topology.FIELD_CORRELATED); writer.addRow(connectionEvent); } catch (IOException e) { logger.error("could not write to orc", e); } } private void closeWriter() { if (writer != null) { try { writer.close(); } catch (IOException e) { Throwables.propagate(e); } finally { writer = null; } } } private void createWriter() { try { Configuration fsConf = new Configuration(); fsConf.set("fs.defaultFS", hdfsUrl); FileSystem fs = new RawLocalFileSystem(); //FileSystem.get(fsConf); String fileName = System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".orc"; path = new Path("/data/diska/orc/" + dbName + "/" + tableName + "/" + partition + "/" + fileName); Configuration writerConf = new Configuration(); ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + tableName, fields); int stripeSize = 250 * 1024 * 1024; int compressBufferSize = 256 * 1024; int rowIndexStride = 10000; writer = OrcFile.createWriter(fs, path, writerConf, oi, stripeSize, SNAPPY, compressBufferSize, rowIndexStride); } catch (IOException e) { throw Throwables.propagate(e); } } private void refreshWriter() { partition = currentUnixTime.getQuarterHour(); closeWriter(); createWriter(); } private void refreshWriterIfNeeded() { if (writer == null || partition != currentUnixTime.getQuarterHour()) { refreshWriter(); } } } [http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726] Grant Overby Software Engineer Cisco.com<http://www.cisco.com/> grove...@cisco.com<mailto:grove...@cisco.com> Mobile: 865 724 4910 [http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you print. This email may contain confidential and privileged material for the sole use of the intended recipient. Any review, use, distribution or disclosure by others is strictly prohibited. If you are not the intended recipient (or authorized to receive for the recipient), please contact the sender by reply email and delete all copies of this message. Please click here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for Company Registration Information.