Yes (if I understand correctly what you aim for). On 01/19/2016 05:57 PM, Saiph Kappa wrote: > Thanks for your reply Mattias. So it is not possible to open a socket > server in the JobGraph and having it open during the lifetime of the > job, is that what you are saying? And it is required to have an external > process to open that socket server. > > On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax <mj...@apache.org > <mailto:mj...@apache.org>> wrote: > > Your "SocketWriter-Thread" code will run on your client. All code in > "main" runs on the client. > > execute() itself runs on the client, too. Of course, it triggers the job > submission to the cluster. In this step, the assembled job from the > previous calls is translated into the JobGraph which is submitted to the > JobManager for execution. > > You should start your SocketWriter-Thread manually on the cluster, ie, > if you use "localhost" in "env.socketTextStream", it must be the > TaskManager machine that executes this SocketStream-source task. > > I guess, it would be better not to use "localhost", but start your > SocketWriter-Thread on a dedicated machine in the cluster, and connect > your SocketStream-source to this machine via its host name. > > -Matthias > > > > On 01/19/2016 03:57 PM, Saiph Kappa wrote: > > Hi, > > > > This is a simple example that I found using Flink Stream. I changed it > > so the flink client can be executed on a remote cluster, and so > that it > > can open a socket server to ship its results for any other consumer > > machine. It seems to me that the socket server is not being open > in the > > remote cluster, but rather in my local machine (which I'm using to > > launch the app). How can I achieve that? I want to be able to ship > > results directly from the remote cluster, and through a socket server > > where clients can use as a tap. > > > > Sorry about indentation: > > > > |def main(args: Array[String]) { | > > > > val env = > > StreamExecutionEnvironment.createRemoteEnvironment("myhostname", > > DefaultFlinkMasterPort, > > > > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at > > map it to StockPrice objects val socketStockStream = > > env.socketTextStream("localhost", 9999).map(x => { val split = > > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate > other > > stock streams val SPX_Stream = > env.addSource(generateStock("SPX")(10) _) > > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val > > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val > BUX_Stream = > > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams > > together val stockStream = socketStockStream.merge(SPX_Stream, > > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print() > > | > > > > // WHERE IS THE FOLLOWING CODE RUN? > > > > |var out: PrintWriter = null > > new Thread { > > override def run(): Unit = { > > val serverSocket = new ServerSocket(12345) > > while (true) { > > val socket = serverSocket.accept() > > val hostname = socket.getInetAddress.getHostName.split('.').head > > println(s"Got a new connection from $hostname") > > out = new PrintWriter(socket.getOutputStream) > > } > > } > > }.start() > > > > |||stockStream|.addSink(record => { > > if(out != null) { > > out.write(record) > > out.flush() > > } > > }) > > > > env.execute("Stock stream") }| > > > > Thanks. > >
signature.asc
Description: OpenPGP digital signature