Hi,
I have an open Pull Request for a RollingFile sink. It is integrated with
checkpointing, so it can provide exactly-once behavior. If you're
interested, please check it out: https://github.com/apache/flink/pull/1084

Cheers,
Aljoscha

On Wed, 26 Aug 2015 at 10:31 Stephan Ewen <se...@apache.org> wrote:

> BTW: We are currently working on adding a rolling-file HDFS sink to Flink
> that will initially work very similar as what flume gives you. If I
> understand it correctly, Flume may have duplicates in the output from
> incomplete flushes on failures.
>
> We have actually a design to extend this later to a proper "exactly once"
> sink, integrated into Flink's checkpointing, which discards duplicates
> properly by offset tracking and truncating/compacting.
>
>
> On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <hpz...@gmail.com>
> wrote:
>
>> Hi Stephan,
>>
>> even though I started the discussion, I was just trying to estimate the
>> effort. In that project they finally opted to use flume with a Kafka
>> channel.
>>
>> Best, Hans-Peter
>>
>> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <al...@bouyguestelecom.fr>
>> wrote:
>>
>>> Hi Stephen,
>>>
>>>
>>>
>>> I do not have a Kafka->HDFS solution, but I do have a streaming sink
>>> that writes to HDFS (external, text hive table) with auto-partitioning and
>>> rolling files. However, it does not take care of checkpointing and may have
>>> flushing issues if some partitions are seldom seen.
>>>
>>>
>>>
>>> I’m not sure it will save you much time, especially given the fact that
>>> it has not been really used yet.
>>>
>>>
>>>
>>> Code is provided with no copyright and no warranty!
>>>
>>>
>>>
>>> *import* java.io.BufferedOutputStream;
>>>
>>> *import* java.io.IOException;
>>>
>>> *import* java.util.HashMap;
>>>
>>> *import* java.util.Map;
>>>
>>>
>>>
>>> *import* org.apache.commons.io.IOUtils;
>>>
>>> *import* org.apache.flink.api.java.tuple.Tuple2;
>>>
>>> *import* org.apache.flink.configuration.Configuration;
>>>
>>> *import* org.apache.flink.core.fs.FileSystem;
>>>
>>> *import* org.apache.flink.core.fs.Path;
>>>
>>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>>
>>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>>>
>>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>>>
>>> *import* org.joda.time.DateTime;
>>>
>>>
>>>
>>> /**
>>>
>>> * This sink streams data to a HDFS directory (hive external table) with
>>> a size limit (rolling files) and automatic
>>>
>>> * partitioning. To be able to read the file content while it’s still
>>> being written, an idea is to add a char(1) field in the last
>>>
>>> * position of the hive line and to check if it has the proper value when
>>> read (if not, the line is not complete)
>>>
>>> *
>>>
>>>  * *@author* alinz
>>>
>>> */
>>>
>>> *public* *class* HiveStreamOutput *extends*
>>> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {
>>>
>>>
>>>
>>>     /**
>>>
>>>      * The Class StreamingFile, encapsulates an open output hdfs file
>>>
>>>      */
>>>
>>>     *public* *static* *class* StreamingFile {
>>>
>>>
>>>
>>>         /** base directory*/
>>>
>>>         *private* *final* String rootPath;
>>>
>>>         /** prefix*/
>>>
>>>         *private* *final* String prefix;
>>>
>>>
>>>
>>>         /** file path*/
>>>
>>>         *private* Path path;
>>>
>>>
>>>
>>>         /** open output stream */
>>>
>>>         *private* BufferedOutputStream stream;
>>>
>>>
>>>
>>>         /** current size */
>>>
>>>         *private* *long* size = 0;
>>>
>>>
>>>
>>>         /** current file number*/
>>>
>>>         *private* *long* nbFile = 0;
>>>
>>>
>>>
>>>         /** instant of the last writing on this stream. If the interval
>>> is too long, flushes content*/
>>>
>>>         *private* *long* lastInvoke;
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Instantiates a new streaming file.
>>>
>>>          * *@param* rootPath destination path
>>>
>>>          * *@param* prefix file name prefix
>>>
>>>          * *@throws* IOException cannot open file
>>>
>>>          */
>>>
>>>         *public* StreamingFile(String rootPath, String prefix) *throws*
>>> IOException {
>>>
>>>             *super*();
>>>
>>>             *this*.rootPath = rootPath;
>>>
>>>             *this*.prefix = prefix;
>>>
>>>             lastInvoke = 0; // always flushes first record
>>>
>>>             open();
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Create destination file on FS
>>>
>>>          * *@throws* IOException issue when opening file
>>>
>>>          */
>>>
>>>         *private* *void* open() *throws* IOException {
>>>
>>>             *this*.path = *new* Path(rootPath, prefix + nbFile);
>>>
>>>             *final* FileSystem filesys = path.getFileSystem();
>>>
>>>             filesys.mkdirs(path.getParent());
>>>
>>>             stream = *new* BufferedOutputStream(filesys.create(path,
>>> *true*));
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * closes stream
>>>
>>>          */
>>>
>>>         *public* *void* closeStream() {
>>>
>>>             IOUtils.*closeQuietly*(stream);
>>>
>>>             stream = *null*; // NOPMD
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Write data into the stream
>>>
>>>          * *@param* data data to write
>>>
>>>          * *@param* maxSize max size of data ; split the file if we
>>> reach it
>>>
>>>          * *@throws* IOException writing issue
>>>
>>>          */
>>>
>>>         *public* *void* writeStream(*byte*[] data, *long* maxSize)
>>> *throws* IOException {
>>>
>>>             stream.write(data);
>>>
>>>             // If the source is too slow, flushes the data. Using this
>>> method, We do not always have the "last flushes",
>>>
>>>             // especially concerning old partitions.
>>>
>>>             // *TODO* If it's an issue, implements a time out thread.
>>>
>>>             *final* *long* maxDelayFlush = 100;
>>>
>>>             *final* *long* invokeTime = System.*currentTimeMillis*();
>>>
>>>             *if* (invokeTime - lastInvoke > maxDelayFlush) {
>>>
>>>                 stream.flush();
>>>
>>>             }
>>>
>>>             lastInvoke = invokeTime;
>>>
>>>             *if* (incTaille(data.length) >= maxSize) {
>>>
>>>                 split();
>>>
>>>             }
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * increment file size
>>>
>>>          * *@param* amount what to add
>>>
>>>          * *@return* the new size
>>>
>>>          */
>>>
>>>         *private* *long* incTaille(*long* amount) {
>>>
>>>             size += amount;
>>>
>>>             *return* size;
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Closes current file and open a new one
>>>
>>>          * *@throws* IOException issue when opening file
>>>
>>>          */
>>>
>>>         *private* *void* split() *throws* IOException {
>>>
>>>             closeStream();
>>>
>>>             nbFile++;
>>>
>>>             open();
>>>
>>>             size = 0;
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * flushes stream
>>>
>>>          * *@throws* IOException I/O issue
>>>
>>>          */
>>>
>>>         *public* *void* flushStream() *throws* IOException {
>>>
>>>             stream.flush();
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>
>>>     /** SUID. */
>>>
>>>     *private* *static* *final* *long* *serialVersionUID* = 1L;
>>>
>>>
>>>
>>>     // Shared fields
>>>
>>>
>>>
>>>     /** Output hive table scheme */
>>>
>>>     *private* *final* HCatSchema outputSchema;
>>>
>>>
>>>
>>>     /** field delimiter */
>>>
>>>     *private* *final* *char* delim;
>>>
>>>
>>>
>>>     /** hdfs root path */
>>>
>>>     *private* *final* String hdfsPath;
>>>
>>>
>>>
>>>     /** Max file size */
>>>
>>>     *private* *final* *long* maxSize;
>>>
>>>
>>>
>>>     // Subtask fields
>>>
>>>
>>>
>>>     /** filename prefix for a subtask, prevents conflicts with another
>>> subtask or a previous run */
>>>
>>>     *private* *transient* String namePrefix;
>>>
>>>
>>>
>>>     /** map of streams indexed per met partition */
>>>
>>>     *private* *transient* Map<String, StreamingFile> streams;
>>>
>>>
>>>
>>>     /** instant of the last periodic flush */
>>>
>>>     *private* *transient* *long* lastFlushAll;
>>>
>>>
>>>
>>>     /**
>>>
>>>      * Builds a streamer.
>>>
>>>      * *@param* outputSchema output record schema (without partition)
>>>
>>>      * *@param* delim field delimiter
>>>
>>>      * *@param* hdfsPath HDFS destination path
>>>
>>>      * *@param* maxSize max size of a file (rolls the file if reached)
>>>
>>>      */
>>>
>>>     *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim,
>>> String hdfsPath, *long* maxSize) {
>>>
>>>         *super*();
>>>
>>>         *this*.outputSchema = outputSchema;
>>>
>>>         *this*.delim = delim;
>>>
>>>         *this*.hdfsPath = hdfsPath;
>>>
>>>         *this*.maxSize = maxSize;
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* open(Configuration parameters) *throws* Exception { //
>>> NOPMD
>>>
>>>         // Prefix is unique for a run and a subtask, to avoid conflicts
>>>
>>>         namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask()
>>> + "_" + (*new* DateTime().getMillis()) + "_";
>>>
>>>         streams = *new* HashMap<String, StreamingFile>();
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* close() *throws* Exception { // NOPMD
>>>
>>>         *for* (*final* StreamingFile file : streams.values()) {
>>>
>>>             file.closeStream();
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value)
>>> *throws* Exception { // NOPMD
>>>
>>>         *final* String partition = value.f0;
>>>
>>>         *final* String record = HiveFileOutputFormat.*getRecordLine*(
>>> value.f1, outputSchema, delim);
>>>
>>>         // Do we have an open data stream for this partition ?
>>>
>>>         StreamingFile file = streams.get(partition);
>>>
>>>         *if* (file == *null*) {
>>>
>>>             file = *new* StreamingFile(hdfsPath + "/" + partition,
>>> namePrefix);
>>>
>>>             streams.put(partition, file);
>>>
>>>         }
>>>
>>>         file.writeStream(record.getBytes(), maxSize);
>>>
>>>
>>>
>>>         // Periodically flush all streams
>>>
>>>         *final* *long* invoke = System.*currentTimeMillis*();
>>>
>>>         *final* *long* flushPeriod = 10000;
>>>
>>>         *if* (invoke - lastFlushAll > flushPeriod) {
>>>
>>>             lastFlushAll = invoke;
>>>
>>>             *for* (*final* StreamingFile stream : streams.values()) {
>>>
>>>                 stream.flushStream();
>>>
>>>             }
>>>
>>>         }
>>>
>>>
>>>
>>

Reply via email to