It is in AbstractRichFunction [1]. RichSinkFunction extends AbstractRichFunction: public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>
Best, Fabian [1] https://github.com/apache/flink/blob/583c527fc3fc693dd40b908d969f1e510ff7dfb3/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java 2015-03-12 1:28 GMT+01:00 Emmanuel <ele...@msn.com>: > I don't see an 'open()' function to override in the RichSinkFunction or > the SinkFunction... so where is this open() function supposed to be? > > > ------------------------------ > Date: Thu, 12 Mar 2015 01:17:34 +0100 > Subject: Re: Socket output stream > From: fhue...@gmail.com > To: user@flink.apache.org > > > Hi Emmanuel, > > the open() method should the right place for setting up the socket > connection. It is called on the worker node before the first input arrives. > > Best, Fabian > > 2015-03-12 1:05 GMT+01:00 Emmanuel <ele...@msn.com>: > > Hi Marton, > > Thanks for the info. > > I've been trying to implement a socket sink but running into 'Not > Serializable' kind of issues. > I was seeing in the Spark docs that this is typically an issue, where the > socket should be created on the worker node, as it can't be serialized to > be moved from the supervisor. > > http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > So, not sure how this would be implemented in Flink... > My attempt (maybe very naive) looked like this: > > public static final class SocketSink extends RichSinkFunction<String> { > > private PrintWriter out; > > public SocketSink(String host, Integer port) throws IOException { > Socket clientSocket = new Socket(host,port); > out = new PrintWriter(clientSocket.getOutputStream(), true); > } > > @Override > public void invoke(String s) { > out.println(s); > } > } > > > maybe i should just move to Kafka directly... ;/ > > Thanks for help > > Emmanuel > > > > ------------------------------ > From: mbala...@apache.org > Date: Wed, 11 Mar 2015 16:37:41 +0100 > Subject: Fwd: Flink questions > To: ele...@msn.com > CC: rmetz...@apache.org; hsapu...@apache.org; user@flink.apache.org > > Dear Emmanuel, > > I'm Marton, one of the Flink Streaming developers - Robert forwarded your > issue to me. Thanks for trying out our project. > > 1) Debugging: TaskManager logs are currently not forwarded to the UI, but > you can find them on the taskmanager machines in the log folder of your > Flink distribution. We have this issue on our agenda in the very near > future - they need to be accessible from the UI. > > 2) Output to socket: Currently we do not have a preimplemented sink for > sockets (although we offer a socket source and sinks writing to Apache > Kafka, Flume and RabbitMQ). You can easily implement a socket sink by > extending the abstract RichSinkFunction class though. [1] > > For using that you can simply say dataStream.addSink(MySinkFunction()) - > in that you can bring up a socket or any other service. You would create a > socket in the open function and then in the invoke method you would write > every value out to it. > > I do agree that this is a nice tool to have so I have opened a JIRA ticket > for it. [2] > > 3) Internal data format: Robert was kind enough to offer a more detailed > answer on this issue. In general streaming sinks support any file output > that is supported by batch Flink including Avro. You can use this > functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)). > > [1] > http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world > [2] https://issues.apache.org/jira/browse/FLINK-1688 > > Best, > > Marton > > *From:* Emmanuel <ele...@msn.com> > *Date:* 11. März 2015 14:59:31 MEZ > *To:* Robert Metzger <rmetz...@apache.org>, Henry Saputra < > hsapu...@apache.org> > *Subject:* *Flink questions* > > Hello, > > > > Thanks again for the help yesterday: the simple things go a long way to > get me moving... > > I have more questions i hope I can get your opinion and input about: > > *Debugging:* > What's the preferred or recommended way to proceed? > I have been using some System.out.println() statements in my simple test > code, and the results are confusing: > First, in the UI, the logs are for the jobmanager.out, but there is never > anything there; wherever i see output in a log it's on the taskmanager.out > file > Also, even more confusing is the fact that often times I just get no log > at all... the UI says the topology is running, but nothing get printed > out... > Is there a process you'd recommend to follow to debug properly with logs? > > *Output to socket* > Ideally I'd like to print out to a socket/stream and read from another > machine so as not to choke the node with disk I/Os when testing > performances. Not sure how to do that. > > *Internal Data format* > Finally, a practical question about data format: we ingest JSON, which is > not convenient, and uses a lot of space. Internally Java/Scala prefers > Tuples, and we were thinking of using ProtoBuffs. > There is also Avro that could do this as I understand it... What would be > the recommended way to format data internally? > > Thanks for your input. > > Cheers > Emmanuel > > > > >