Changing the class hierarchy would break backwards-compatibility of the
API. However, we could add another method to DataStream to easily use
OutputFormats in streaming.

How did you write your adapter? I came up with the one below. Admittedly,
it is sort of a hack but works fine. By the way, you can also use the
DataStream.write(OutputFormat format) method to use any OutputFormat. The
code is below is just if you really only want to use
DataStream.addSink(SinkFunction function).

Cheers,
Max

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.Collection;

public class OutputFormatAdapter<T> extends LocalCollectionOutputFormat<T>
   implements SinkFunction<T>, RichFunction {

   public OutputFormatAdapter(Collection<T> out) {
      super(out);
   }

   @Override
   public void invoke(T value) throws Exception {
      super.writeRecord(value);
   }

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
   }

   @Override
   public IterationRuntimeContext getIterationRuntimeContext() {
      throw new UnsupportedOperationException("This is not supported.");
   }


   /** Small test */
   public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      final DataStreamSource<Long> longDataStreamSource =
env.generateSequence(0, 1000);

      final ArrayList<Long> longs = new ArrayList<>();

      longDataStreamSource.addSink(new OutputFormatAdapter<>(longs));

      env.execute();

      for (long l : longs) {
         System.out.println(l);
      }
   }
}



On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <ndimi...@apache.org> wrote:

> In my case, I have my application code that is calling addSink, for which
> I'm writing a test that needs to use LocalCollectionOutputFormat. Having
> two separate class hierarchies is not helpful, hence the adapter. Much of
> this code already exists in the implementation of FileSinkFunction, so the
> project already supports it in a limited way.
>
> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <m...@apache.org> wrote:
>
>> Hi Nick,
>>
>> SinkFunction just implements user-defined functions on incoming
>> elements. OutputFormat offers more lifecycle methods. Thus it is a
>> more powerful interface. The OutputFormat originally comes from the
>> batch API, whereas the SinkFunction originates from streaming. Those
>> were more separate code paths in the past. Ultimately, it would make
>> sense to have only the OutputFormat interface but I think we have to
>> keep it to not break the API.
>>
>> If you need the lifecycle methods in streaming, there is
>> RichSinkFunction, which implements OutputFormat and SinkFunction. In
>> addition, it gives you access to the RuntimeContext. You can pass this
>> directly to the "addSink(sinkFunction)" API method.
>>
>> Cheers,
>> Max
>>
>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <ndimi...@apache.org> wrote:
>> > Heya,
>> >
>> > Is there a plan to consolidate these two interfaces? They appear to
>> provide
>> > identical functionality, differing only in lifecycle management. I found
>> > myself writing an adaptor so I can consume an OutputFormat where a
>> > SinkFunction is expected; there's not much to it. This seems like code
>> that
>> > Flink should ship.
>> >
>> > Maybe one interface or the other can be deprecated for 1.0 API?
>> >
>> > Thanks,
>> > Nick
>>
>
>

Reply via email to