Yes I did open a socket with netcat. Turns out my first error was due to a stream without a sink triggering the socket connect and (I thought that without a sink the stream wouldn't affect anything so I didn't comment it out, and I didn't open the socket for that port). However
I did play with it some more and I think the real issue is that I'm trying to have two streams, one write to a port and another read from the same port. i.e. val y = executionEnvironment.socketTextStream("localhost", 9000) x.writeToSocket("localhost", 9000, new SimpleStringSchema()) Once I tested just write or just the read it worked, but combined I get this error: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.read1(BufferedReader.java:210) at java.io.BufferedReader.read(BufferedReader.java:286) at java.io.Reader.read(Reader.java:140) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) Is this operation not allowed? And I'm mainly writing to the same socket in order to pass work back and forth between streams.