Hi,

This is a simple example that I found using Flink Stream. I changed it so
the flink client can be executed on a remote cluster, and so that it can
open a socket server to ship its results for any other consumer machine. It
seems to me that the socket server is not being open in the remote cluster,
but rather in my local machine (which I'm using to launch the app). How can
I achieve that? I want to be able to ship results directly from the remote
cluster, and through a socket server where clients can use as a tap.

Sorry about indentation:

def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.createRemoteEnvironment
("myhostname", DefaultFlinkMasterPort,

  "myapp-assembly-0.1-SNAPSHOT.jar");

  //Read from a socket stream at map it to StockPrice objects
  val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
    val split = x.split(",")
    StockPrice(split(0), split(1).toDouble)
  })

  //Generate other stock streams
  val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
  val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
  val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
  val BUX_Stream = env.addSource(generateStock("BUX")(40) _)

  //Merge all stock streams together
  val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream,
    DJI_Stream, BUX_Stream)

  stockStream.print()

// WHERE IS THE FOLLOWING CODE RUN?

var out: PrintWriter = null
new Thread {
  override def run(): Unit = {
    val serverSocket = new ServerSocket(12345)
    while (true) {
      val socket = serverSocket.accept()
      val hostname = socket.getInetAddress.getHostName.split('.').head
      println(s"Got a new connection from $hostname")
      out = new PrintWriter(socket.getOutputStream)
    }
  }
}.start()

stockStream.addSink(record => {
  if(out != null) {
    out.write(record)
    out.flush()
  }
})


  env.execute("Stock stream")}

Thanks.

Reply via email to