Hi Rico! Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in and implement this, unless you have something already.
Stephan On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org> wrote: > BTW: This is becoming a dev discussion, maybe should move to that list... > > On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote: > >> Yes, one needs exactly a mechanism to seek the output stream back to the >> last checkpointed position, in order to overwrite duplicates. >> >> I think HDFS is not going to make this easy, there is basically no seek >> for write. Not sure how to solve this, other then writing to tmp files and >> copying upon success. >> >> Apache Flume must have solved this issue in some way, it may be a worth >> looking into how they solved it. >> >> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <i...@ricobergmann.de> >> wrote: >> >>> My ideas for checkpointing: >>> >>> I think writing to the destination should not depend on the checkpoint >>> mechanism (otherwise the output would never be written to the destination >>> if checkpointing is disabled). Instead I would keep the offsets of written >>> and Checkpointed records. When recovering you would then somehow delete or >>> overwrite the records after that offset. (But I don't really know whether >>> this is as simple as I wrote it ;-) ). >>> >>> Regarding the rolling files I would suggest making the values of the >>> user-defined partitioning function part of the path or file name. Writing >>> records is then basically: >>> Extract the partition to write to, then add the record to a queue for >>> this partition. Each queue has an output format assigned to it. On flushing >>> the output file is opened, the content of the queue is written to it, and >>> then closed. >>> >>> Does this sound reasonable? >>> >>> >>> >>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <aljos...@apache.org>: >>> >>> Yes, this seems like a good approach. We should probably no reuse the >>> KeySelector for this but maybe a more use-case specific type of function >>> that can create a desired filename from an input object. >>> >>> This is only the first part, though. The hard bit would be implementing >>> rolling files and also integrating it with Flink's checkpointing mechanism. >>> For integration with checkpointing you could maybe use "staging-files": all >>> elements are put into a staging file. And then, when the notification about >>> a completed checkpoint is received the contents of this file would me moved >>> (or appended) to the actual destination. >>> >>> Do you have any Ideas about the rolling files/checkpointing? >>> >>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <i...@ricobergmann.de> wrote: >>> >>>> I'm thinking about implementing this. >>>> >>>> After looking into the flink code I would basically subclass >>>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an >>>> additional KeySelector object. The path in the file system is then appended >>>> by the string, the KeySelector returns. >>>> >>>> U think this is a good approach? >>>> >>>> Greets. Rico. >>>> >>>> >>>> >>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>: >>>> >>>> If you are up for it, this would be a very nice addition to Flink, a >>>> great contribution :-) >>>> >>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hi! >>>>> >>>>> This should definitely be possible in Flink. Pretty much exactly like >>>>> you describe it. >>>>> >>>>> You need a custom version of the HDFS sink with some logic when to >>>>> roll over to a new file. >>>>> >>>>> You can also make the sink "exactly once" by integrating it with the >>>>> checkpointing. For that, you would probably need to keep the current path >>>>> and output stream offsets as of the last checkpoint, so you can resume >>>>> from >>>>> that offset and overwrite records to avoid duplicates. If that is not >>>>> possible, you would probably buffer records between checkpoints and only >>>>> write on checkpoints. >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hpz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Did anybody think of (mis-) using Flink streaming as an alternative >>>>>> to Apache Flume just for ingesting data from Kafka (or other streaming >>>>>> sources) to HDFS? Knowing that Flink can read from Kafka and write to >>>>>> hdfs >>>>>> I assume it should be possible, but Is this a good idea to do? >>>>>> >>>>>> Flume basically is about consuming data from somewhere, peeking into >>>>>> each record and then directing it to a specific directory/file in HDFS >>>>>> reliably. I've seen there is a FlumeSink, but would it be possible to get >>>>>> the same functionality with >>>>>> Flink alone? >>>>>> >>>>>> I've skimmed through the documentation and found the option to split >>>>>> the output by key and the possibility to add multiple sinks. As I >>>>>> understand, Flink programs are generally static, so it would not be >>>>>> possible to add/remove sinks at runtime? >>>>>> So you would need to implement a custom sink directing the records to >>>>>> different files based on a key (e.g. date)? Would it be difficult to >>>>>> implement things like rolling outputs etc? Or better just use Flume? >>>>>> >>>>>> Best, >>>>>> Hans-Peter >>>>>> >>>>>> >>>>>> >>>>> >>>> >> >