Hi, I have an open Pull Request for a RollingFile sink. It is integrated with checkpointing, so it can provide exactly-once behavior. If you're interested, please check it out: https://github.com/apache/flink/pull/1084
Cheers, Aljoscha On Wed, 26 Aug 2015 at 10:31 Stephan Ewen <se...@apache.org> wrote: > BTW: We are currently working on adding a rolling-file HDFS sink to Flink > that will initially work very similar as what flume gives you. If I > understand it correctly, Flume may have duplicates in the output from > incomplete flushes on failures. > > We have actually a design to extend this later to a proper "exactly once" > sink, integrated into Flink's checkpointing, which discards duplicates > properly by offset tracking and truncating/compacting. > > > On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <hpz...@gmail.com> > wrote: > >> Hi Stephan, >> >> even though I started the discussion, I was just trying to estimate the >> effort. In that project they finally opted to use flume with a Kafka >> channel. >> >> Best, Hans-Peter >> >> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <al...@bouyguestelecom.fr> >> wrote: >> >>> Hi Stephen, >>> >>> >>> >>> I do not have a Kafka->HDFS solution, but I do have a streaming sink >>> that writes to HDFS (external, text hive table) with auto-partitioning and >>> rolling files. However, it does not take care of checkpointing and may have >>> flushing issues if some partitions are seldom seen. >>> >>> >>> >>> I’m not sure it will save you much time, especially given the fact that >>> it has not been really used yet. >>> >>> >>> >>> Code is provided with no copyright and no warranty! >>> >>> >>> >>> *import* java.io.BufferedOutputStream; >>> >>> *import* java.io.IOException; >>> >>> *import* java.util.HashMap; >>> >>> *import* java.util.Map; >>> >>> >>> >>> *import* org.apache.commons.io.IOUtils; >>> >>> *import* org.apache.flink.api.java.tuple.Tuple2; >>> >>> *import* org.apache.flink.configuration.Configuration; >>> >>> *import* org.apache.flink.core.fs.FileSystem; >>> >>> *import* org.apache.flink.core.fs.Path; >>> >>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction; >>> >>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord; >>> >>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema; >>> >>> *import* org.joda.time.DateTime; >>> >>> >>> >>> /** >>> >>> * This sink streams data to a HDFS directory (hive external table) with >>> a size limit (rolling files) and automatic >>> >>> * partitioning. To be able to read the file content while it’s still >>> being written, an idea is to add a char(1) field in the last >>> >>> * position of the hive line and to check if it has the proper value when >>> read (if not, the line is not complete) >>> >>> * >>> >>> * *@author* alinz >>> >>> */ >>> >>> *public* *class* HiveStreamOutput *extends* >>> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> { >>> >>> >>> >>> /** >>> >>> * The Class StreamingFile, encapsulates an open output hdfs file >>> >>> */ >>> >>> *public* *static* *class* StreamingFile { >>> >>> >>> >>> /** base directory*/ >>> >>> *private* *final* String rootPath; >>> >>> /** prefix*/ >>> >>> *private* *final* String prefix; >>> >>> >>> >>> /** file path*/ >>> >>> *private* Path path; >>> >>> >>> >>> /** open output stream */ >>> >>> *private* BufferedOutputStream stream; >>> >>> >>> >>> /** current size */ >>> >>> *private* *long* size = 0; >>> >>> >>> >>> /** current file number*/ >>> >>> *private* *long* nbFile = 0; >>> >>> >>> >>> /** instant of the last writing on this stream. If the interval >>> is too long, flushes content*/ >>> >>> *private* *long* lastInvoke; >>> >>> >>> >>> /** >>> >>> * Instantiates a new streaming file. >>> >>> * *@param* rootPath destination path >>> >>> * *@param* prefix file name prefix >>> >>> * *@throws* IOException cannot open file >>> >>> */ >>> >>> *public* StreamingFile(String rootPath, String prefix) *throws* >>> IOException { >>> >>> *super*(); >>> >>> *this*.rootPath = rootPath; >>> >>> *this*.prefix = prefix; >>> >>> lastInvoke = 0; // always flushes first record >>> >>> open(); >>> >>> } >>> >>> >>> >>> /** >>> >>> * Create destination file on FS >>> >>> * *@throws* IOException issue when opening file >>> >>> */ >>> >>> *private* *void* open() *throws* IOException { >>> >>> *this*.path = *new* Path(rootPath, prefix + nbFile); >>> >>> *final* FileSystem filesys = path.getFileSystem(); >>> >>> filesys.mkdirs(path.getParent()); >>> >>> stream = *new* BufferedOutputStream(filesys.create(path, >>> *true*)); >>> >>> } >>> >>> >>> >>> /** >>> >>> * closes stream >>> >>> */ >>> >>> *public* *void* closeStream() { >>> >>> IOUtils.*closeQuietly*(stream); >>> >>> stream = *null*; // NOPMD >>> >>> } >>> >>> >>> >>> /** >>> >>> * Write data into the stream >>> >>> * *@param* data data to write >>> >>> * *@param* maxSize max size of data ; split the file if we >>> reach it >>> >>> * *@throws* IOException writing issue >>> >>> */ >>> >>> *public* *void* writeStream(*byte*[] data, *long* maxSize) >>> *throws* IOException { >>> >>> stream.write(data); >>> >>> // If the source is too slow, flushes the data. Using this >>> method, We do not always have the "last flushes", >>> >>> // especially concerning old partitions. >>> >>> // *TODO* If it's an issue, implements a time out thread. >>> >>> *final* *long* maxDelayFlush = 100; >>> >>> *final* *long* invokeTime = System.*currentTimeMillis*(); >>> >>> *if* (invokeTime - lastInvoke > maxDelayFlush) { >>> >>> stream.flush(); >>> >>> } >>> >>> lastInvoke = invokeTime; >>> >>> *if* (incTaille(data.length) >= maxSize) { >>> >>> split(); >>> >>> } >>> >>> } >>> >>> >>> >>> /** >>> >>> * increment file size >>> >>> * *@param* amount what to add >>> >>> * *@return* the new size >>> >>> */ >>> >>> *private* *long* incTaille(*long* amount) { >>> >>> size += amount; >>> >>> *return* size; >>> >>> } >>> >>> >>> >>> /** >>> >>> * Closes current file and open a new one >>> >>> * *@throws* IOException issue when opening file >>> >>> */ >>> >>> *private* *void* split() *throws* IOException { >>> >>> closeStream(); >>> >>> nbFile++; >>> >>> open(); >>> >>> size = 0; >>> >>> } >>> >>> >>> >>> /** >>> >>> * flushes stream >>> >>> * *@throws* IOException I/O issue >>> >>> */ >>> >>> *public* *void* flushStream() *throws* IOException { >>> >>> stream.flush(); >>> >>> } >>> >>> } >>> >>> >>> >>> /** SUID. */ >>> >>> *private* *static* *final* *long* *serialVersionUID* = 1L; >>> >>> >>> >>> // Shared fields >>> >>> >>> >>> /** Output hive table scheme */ >>> >>> *private* *final* HCatSchema outputSchema; >>> >>> >>> >>> /** field delimiter */ >>> >>> *private* *final* *char* delim; >>> >>> >>> >>> /** hdfs root path */ >>> >>> *private* *final* String hdfsPath; >>> >>> >>> >>> /** Max file size */ >>> >>> *private* *final* *long* maxSize; >>> >>> >>> >>> // Subtask fields >>> >>> >>> >>> /** filename prefix for a subtask, prevents conflicts with another >>> subtask or a previous run */ >>> >>> *private* *transient* String namePrefix; >>> >>> >>> >>> /** map of streams indexed per met partition */ >>> >>> *private* *transient* Map<String, StreamingFile> streams; >>> >>> >>> >>> /** instant of the last periodic flush */ >>> >>> *private* *transient* *long* lastFlushAll; >>> >>> >>> >>> /** >>> >>> * Builds a streamer. >>> >>> * *@param* outputSchema output record schema (without partition) >>> >>> * *@param* delim field delimiter >>> >>> * *@param* hdfsPath HDFS destination path >>> >>> * *@param* maxSize max size of a file (rolls the file if reached) >>> >>> */ >>> >>> *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim, >>> String hdfsPath, *long* maxSize) { >>> >>> *super*(); >>> >>> *this*.outputSchema = outputSchema; >>> >>> *this*.delim = delim; >>> >>> *this*.hdfsPath = hdfsPath; >>> >>> *this*.maxSize = maxSize; >>> >>> } >>> >>> >>> >>> /** {@inheritDoc} */ >>> >>> @Override >>> >>> *public* *void* open(Configuration parameters) *throws* Exception { // >>> NOPMD >>> >>> // Prefix is unique for a run and a subtask, to avoid conflicts >>> >>> namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() >>> + "_" + (*new* DateTime().getMillis()) + "_"; >>> >>> streams = *new* HashMap<String, StreamingFile>(); >>> >>> } >>> >>> >>> >>> /** {@inheritDoc} */ >>> >>> @Override >>> >>> *public* *void* close() *throws* Exception { // NOPMD >>> >>> *for* (*final* StreamingFile file : streams.values()) { >>> >>> file.closeStream(); >>> >>> } >>> >>> } >>> >>> >>> >>> /** {@inheritDoc} */ >>> >>> @Override >>> >>> *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value) >>> *throws* Exception { // NOPMD >>> >>> *final* String partition = value.f0; >>> >>> *final* String record = HiveFileOutputFormat.*getRecordLine*( >>> value.f1, outputSchema, delim); >>> >>> // Do we have an open data stream for this partition ? >>> >>> StreamingFile file = streams.get(partition); >>> >>> *if* (file == *null*) { >>> >>> file = *new* StreamingFile(hdfsPath + "/" + partition, >>> namePrefix); >>> >>> streams.put(partition, file); >>> >>> } >>> >>> file.writeStream(record.getBytes(), maxSize); >>> >>> >>> >>> // Periodically flush all streams >>> >>> *final* *long* invoke = System.*currentTimeMillis*(); >>> >>> *final* *long* flushPeriod = 10000; >>> >>> *if* (invoke - lastFlushAll > flushPeriod) { >>> >>> lastFlushAll = invoke; >>> >>> *for* (*final* StreamingFile stream : streams.values()) { >>> >>> stream.flushStream(); >>> >>> } >>> >>> } >>> >>> >>> >>