Hi there, I'm trying to test a couple of things by having my stream write to a socket, but it keeps failing to connect (I'm trying to have a stream write to a socket, and have another stream read from that socket).
Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) I tried writeToSocket(parameterTool.get("localhost"), parameterTool.getInt(9000), new SimpleStringSchema), and even a custom sink: addSink(w => { val ia = InetAddress.getByName("localhost") val socket = new Socket(ia, 9000) val outStream = socket.getOutputStream val out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(outStream))) out.println(w) out.flush() out.close() }) But none of this seem to work. I'm fairly sure I setup the server correctly since I can connect it to via a telnet and other dummy echoclients I wrote. I can also have my data stream read from that same socket without any issues, but I can't seem to tell my stream to write to this socket without the above connection refused error showing up. Is there some nuance here I'm missing? Thanks!