I think you have a point, Nick. OutputFormats on its own have the same fault-tolerance semantics as SinkFunctions. What kind of failure semantics they guarantee depends on the actual implementation. For instance, the RMQSource has exactly-once semantics but the RMQSink currently does not. If you care about exactly-once semantics, you have to look into the documentation and use the sources and sinks accordingly. It is not like OutputFormats are dangerous but all SinkFunctions are failure-proof.
Consolidating the two interfaces would make sense. It might be a bit late for the 1.0 release because I see that we would need to find a consensus first and there are many things in the backlog :) On Tue, Feb 9, 2016 at 3:20 AM, Nick Dimiduk <ndimi...@apache.org> wrote: > I think this depends on the implementation of the OutputFormat. For > instance, an HBase, Cassandra or ES OF will handle most operations as > idempotent when the scheme is designed appropriately. > > You are (rightly) focusing on FileOF's, which also depend on the semantics > of their implementation. MR always required an atomic rename of the DFS, > and only moved output files into place once the task commits its output. > > Also I think it unreasonable to bring exactly once considerations into the > discussion because nothing provides this right now without a multi-stage > commit protocol. Such a protocol would be provided at the framework level > and to the best of my knowledge it's semantic expectations on the output > handler are undefined. > > My original question comes from wanting to use the LocalCollectionOF to > test a streaming flow that sinks to Kafka, without rewriting the flow in > test code. So in this case you're right that it does apply to tests. I > don't think correctness of tests is a trivial concern though. > > As for RollingFileSink, I've not seen this conversation so I cannot > comment. Per my earlier examples, I think it's not correct to assume all OF > implementations are file-based. > > > On Monday, February 8, 2016, Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi, >> one problem that I see with OutputFormat is that they are not made for a >> streaming world. By this, I mean that they don’t handle failure well and >> don’t consider fault-torelant streaming, i.e. exactly once semantics. For >> example, what would be expected to happen if a job with a FileOutputFormat >> fails and needs to recover. Now, there might be some garbage left in the >> files that would get emitted again after restoring to a checkpoint, thus >> leading to duplicate results. >> >> Having OutputFormats in a Streaming programs can work well in toy >> examples and tests but can be dangerous in real-world jobs. I once talked >> with Robert about this and we came up with the idea (I think it was mostly >> him) of generalizing the RollingFileSink (which is fault-tolerance aware) >> so that it can easily be used with something akin to OutputFormats. >> >> What do you think? >> >> -Aljoscha >> > On 08 Feb 2016, at 19:40, Nick Dimiduk <ndimi...@apache.org> wrote: >> > >> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <m...@apache.org> >> wrote: >> > Changing the class hierarchy would break backwards-compatibility of the >> API. However, we could add another method to DataStream to easily use >> OutputFormats in streaming. >> > >> > Indeed, that's why I suggested deprecating one and moving toward a >> consolidated class hierarchy. It won't happen overnight, but this can be >> managed pretty easily with some adapter code like this and some additional >> overrides in the public APIs. >> > >> > How did you write your adapter? I came up with the one below. >> > >> > Our implementations are similar. This one is working fine with my test >> code. >> > >> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3 >> > >> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <ndimi...@apache.org> >> wrote: >> > In my case, I have my application code that is calling addSink, for >> which I'm writing a test that needs to use LocalCollectionOutputFormat. >> Having two separate class hierarchies is not helpful, hence the adapter. >> Much of this code already exists in the implementation of FileSinkFunction, >> so the project already supports it in a limited way. >> > >> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <m...@apache.org> >> wrote: >> > Hi Nick, >> > >> > SinkFunction just implements user-defined functions on incoming >> > elements. OutputFormat offers more lifecycle methods. Thus it is a >> > more powerful interface. The OutputFormat originally comes from the >> > batch API, whereas the SinkFunction originates from streaming. Those >> > were more separate code paths in the past. Ultimately, it would make >> > sense to have only the OutputFormat interface but I think we have to >> > keep it to not break the API. >> > >> > If you need the lifecycle methods in streaming, there is >> > RichSinkFunction, which implements OutputFormat and SinkFunction. In >> > addition, it gives you access to the RuntimeContext. You can pass this >> > directly to the "addSink(sinkFunction)" API method. >> > >> > Cheers, >> > Max >> > >> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <ndimi...@apache.org> >> wrote: >> > > Heya, >> > > >> > > Is there a plan to consolidate these two interfaces? They appear to >> provide >> > > identical functionality, differing only in lifecycle management. I >> found >> > > myself writing an adaptor so I can consume an OutputFormat where a >> > > SinkFunction is expected; there's not much to it. This seems like >> code that >> > > Flink should ship. >> > > >> > > Maybe one interface or the other can be deprecated for 1.0 API? >> > > >> > > Thanks, >> > > Nick >> > >> > >> > >> >>