Yes (if I understand correctly what you aim for).

On 01/19/2016 05:57 PM, Saiph Kappa wrote:
> Thanks for your reply Mattias. So it is not possible to open a socket
> server in the JobGraph and having it open during the lifetime of the
> job, is that what you are saying? And it is required to have an external
> process to open that socket server.
> 
> On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <mj...@apache.org
> <mailto:mj...@apache.org>> wrote:
> 
>     Your "SocketWriter-Thread" code will run on your client. All code in
>     "main" runs on the client.
> 
>     execute() itself runs on the client, too. Of course, it triggers the job
>     submission to the cluster. In this step, the assembled job from the
>     previous calls is translated into the JobGraph which is submitted to the
>     JobManager for execution.
> 
>     You should start your SocketWriter-Thread manually on the cluster, ie,
>     if you use "localhost" in "env.socketTextStream", it must be the
>     TaskManager machine that executes this SocketStream-source task.
> 
>     I guess, it would be better not to use "localhost", but start your
>     SocketWriter-Thread on a dedicated machine in the cluster, and connect
>     your SocketStream-source to this machine via its host name.
> 
>     -Matthias
> 
> 
> 
>     On 01/19/2016 03:57 PM, Saiph Kappa wrote:
>     > Hi,
>     >
>     > This is a simple example that I found using Flink Stream. I changed it
>     > so the flink client can be executed on a remote cluster, and so
>     that it
>     > can open a socket server to ship its results for any other consumer
>     > machine. It seems to me that the socket server is not being open
>     in the
>     > remote cluster, but rather in my local machine (which I'm using to
>     > launch the app). How can I achieve that? I want to be able to ship
>     > results directly from the remote cluster, and through a socket server
>     > where clients can use as a tap.
>     >
>     > Sorry about indentation:
>     >
>     > |def main(args: Array[String]) { |
>     >
>     >     val env =
>     > StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
>     > DefaultFlinkMasterPort,
>     >
>     > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
>     > map it to StockPrice objects val socketStockStream =
>     > env.socketTextStream("localhost", 9999).map(x => { val split =
>     > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate
>     other
>     > stock streams val SPX_Stream =
>     env.addSource(generateStock("SPX")(10) _)
>     > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
>     > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val
>     BUX_Stream =
>     > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
>     > together val stockStream = socketStockStream.merge(SPX_Stream,
>     > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
>     > |
>     >
>     > // WHERE IS THE FOLLOWING CODE RUN?
>     >
>     > |var out: PrintWriter = null
>     > new Thread {
>     > override def run(): Unit = {
>     > val serverSocket = new ServerSocket(12345)
>     > while (true) {
>     > val socket = serverSocket.accept()
>     > val hostname = socket.getInetAddress.getHostName.split('.').head
>     > println(s"Got a new connection from $hostname")
>     > out = new PrintWriter(socket.getOutputStream)
>     > }
>     > }
>     > }.start()
>     >
>     > |||stockStream|.addSink(record => {
>     > if(out != null) {
>     > out.write(record)
>     > out.flush()
>     > }
>     > })
>     >
>     > env.execute("Stock stream") }|
>     >
>     > Thanks.
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to