Hi anna, >From the stack trace you provided, it's socket connect error not about Flink.
So, Have you start a socket server at "localhost:9999"? Using a program or CLI tool, such as "nc -l 9999" There is a example you can have a look[1]. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/setup_quickstart.html#run-the-example Thanks, vino. 2018-07-21 4:30 GMT+08:00 anna stax <annasta...@gmail.com>: > It is not the code, but I don't know what the problem is. > A simple word count with socketTextStream used to work but now gives the > same error. > Apps with kafka source which used to work is giving the same error. > When I have a source generator within the app itself works good. > > So, with socketTextStream and kafka source gives me > java.net.ConnectException: Operation timed out (Connection timed out) > error > > On Fri, Jul 20, 2018 at 10:29 AM, anna stax <annasta...@gmail.com> wrote: > >> My object name is CreateUserNotificationRequests, thats why you see >> CreateUserNotificationRequests in the Error message. >> I edited the object name after pasting the code...Hope there is no >> confusion and I get some help. >> Thanks >> >> >> >> On Fri, Jul 20, 2018 at 10:10 AM, anna stax <annasta...@gmail.com> wrote: >> >>> Hello all, >>> >>> This is my code, just trying to make the code example in >>> https://ci.apache.org/projects/flink/flink-docs-release-1 >>> .5/dev/stream/operators/process_function.html work >>> >>> object ProcessFunctionTest { >>> >>> def main(args: Array[String]) { >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >>> val text = env.socketTextStream("localhost", 9999) >>> >>> val text1 = text.map(s => (s,s)).keyBy(0).process(new >>> CountWithTimeoutFunction()) >>> >>> text1.print() >>> >>> env.execute("CountWithTimeoutFunction") >>> } >>> >>> case class CountWithTimestamp(key: String, count: Long, lastModified: >>> Long) >>> >>> class CountWithTimeoutFunction extends ProcessFunction[(String, >>> String), (String, Long)] { >>> >>> lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext >>> .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", >>> classOf[CountWithTimestamp])) >>> >>> override def processElement(value: (String, String), ctx: >>> ProcessFunction[(String, String), (String, Long)]#Context, out: >>> Collector[(String, Long)]): Unit = { >>> ...... >>> } >>> >>> override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, >>> String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): >>> Unit = { >>> ....... >>> } >>> } >>> } >>> >>> >>> Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: >>> java.net.ConnectException: Operation timed out (Connection timed out) >>> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB >>> locking(MiniCluster.java:625) >>> at org.apache.flink.streaming.api.environment.LocalStreamEnviro >>> nment.execute(LocalStreamEnvironment.java:121) >>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm >>> ent.execute(StreamExecutionEnvironment.scala:654) >>> at com.whil.flink.streaming.CreateUserNotificationRequests$.mai >>> n(CreateUserNotificationRequests.scala:42) >>> at com.whil.flink.streaming.CreateUserNotificationRequests.main >>> (CreateUserNotificationRequests.scala) >>> Caused by: java.net.ConnectException: Operation timed out (Connection >>> timed out) >>> at java.net.PlainSocketImpl.socketConnect(Native Method) >>> at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSock >>> etImpl.java:350) >>> at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPl >>> ainSocketImpl.java:206) >>> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocket >>> Impl.java:188) >>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >>> at java.net.Socket.connect(Socket.java:589) >>> at org.apache.flink.streaming.api.functions.source.SocketTextSt >>> reamFunction.run(SocketTextStreamFunction.java:96) >>> at org.apache.flink.streaming.api.operators.StreamSource.run(St >>> reamSource.java:87) >>> at org.apache.flink.streaming.api.operators.StreamSource.run(St >>> reamSource.java:56) >>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.ru >>> n(SourceStreamTask.java:99) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>> treamTask.java:306) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> On Thu, Jul 19, 2018 at 11:22 PM, vino yang <yanghua1...@gmail.com> >>> wrote: >>> >>>> Hi anna, >>>> >>>> Can you share your program and the exception stack trace and more >>>> details about what's your source and state backend? >>>> >>>> From the information you provided, it seems Flink started a network >>>> connect but timed out. >>>> >>>> Thanks, vino. >>>> >>>> 2018-07-20 14:14 GMT+08:00 anna stax <annasta...@gmail.com>: >>>> >>>>> Hi all, >>>>> >>>>> I am new to Flink. I am using the classes CountWithTimestamp and >>>>> CountWithTimeoutFunction from the examples found in >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>>>> dev/stream/operators/process_function.html >>>>> >>>>> I am getting the error Exception in thread "main" >>>>> org.apache.flink.runtime.client.JobExecutionException: >>>>> java.net.ConnectException: Operation timed out (Connection timed out) >>>>> >>>>> Looks like when timer’s time is reached I am getting this error. Any >>>>> idea why. Please help >>>>> >>>>> Thanks >>>>> >>>> >>>> >>> >> >