It is good now. Sorry, my fault. I had multiple applications running and both were using the socket stream . Thanks.
On Sun, Jul 22, 2018 at 8:22 PM, vino yang <yanghua1...@gmail.com> wrote: > Hi anna, > > From the stack trace you provided, it's socket connect error not about > Flink. > > So, Have you start a socket server at "localhost:9999"? Using a program or > CLI tool, such as "nc -l 9999" > > There is a example you can have a look[1]. > > [1]: https://ci.apache.org/projects/flink/flink-docs- > release-1.5/quickstart/setup_quickstart.html#run-the-example > > Thanks, vino. > > 2018-07-21 4:30 GMT+08:00 anna stax <annasta...@gmail.com>: > >> It is not the code, but I don't know what the problem is. >> A simple word count with socketTextStream used to work but now gives >> the same error. >> Apps with kafka source which used to work is giving the same error. >> When I have a source generator within the app itself works good. >> >> So, with socketTextStream and kafka source gives me >> java.net.ConnectException: Operation timed out (Connection timed out) >> error >> >> On Fri, Jul 20, 2018 at 10:29 AM, anna stax <annasta...@gmail.com> wrote: >> >>> My object name is CreateUserNotificationRequests, thats why you see >>> CreateUserNotificationRequests in the Error message. >>> I edited the object name after pasting the code...Hope there is no >>> confusion and I get some help. >>> Thanks >>> >>> >>> >>> On Fri, Jul 20, 2018 at 10:10 AM, anna stax <annasta...@gmail.com> >>> wrote: >>> >>>> Hello all, >>>> >>>> This is my code, just trying to make the code example in >>>> https://ci.apache.org/projects/flink/flink-docs-release-1 >>>> .5/dev/stream/operators/process_function.html work >>>> >>>> object ProcessFunctionTest { >>>> >>>> def main(args: Array[String]) { >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >>>> val text = env.socketTextStream("localhost", 9999) >>>> >>>> val text1 = text.map(s => (s,s)).keyBy(0).process(new >>>> CountWithTimeoutFunction()) >>>> >>>> text1.print() >>>> >>>> env.execute("CountWithTimeoutFunction") >>>> } >>>> >>>> case class CountWithTimestamp(key: String, count: Long, lastModified: >>>> Long) >>>> >>>> class CountWithTimeoutFunction extends ProcessFunction[(String, >>>> String), (String, Long)] { >>>> >>>> lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext >>>> .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", >>>> classOf[CountWithTimestamp])) >>>> >>>> override def processElement(value: (String, String), ctx: >>>> ProcessFunction[(String, String), (String, Long)]#Context, out: >>>> Collector[(String, Long)]): Unit = { >>>> ...... >>>> } >>>> >>>> override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, >>>> String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): >>>> Unit = { >>>> ....... >>>> } >>>> } >>>> } >>>> >>>> >>>> Exception in thread "main" >>>> org.apache.flink.runtime.client.JobExecutionException: >>>> java.net.ConnectException: Operation timed out (Connection timed out) >>>> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB >>>> locking(MiniCluster.java:625) >>>> at org.apache.flink.streaming.api.environment.LocalStreamEnviro >>>> nment.execute(LocalStreamEnvironment.java:121) >>>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm >>>> ent.execute(StreamExecutionEnvironment.scala:654) >>>> at com.whil.flink.streaming.CreateUserNotificationRequests$.mai >>>> n(CreateUserNotificationRequests.scala:42) >>>> at com.whil.flink.streaming.CreateUserNotificationRequests.main >>>> (CreateUserNotificationRequests.scala) >>>> Caused by: java.net.ConnectException: Operation timed out (Connection >>>> timed out) >>>> at java.net.PlainSocketImpl.socketConnect(Native Method) >>>> at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSock >>>> etImpl.java:350) >>>> at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPl >>>> ainSocketImpl.java:206) >>>> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocket >>>> Impl.java:188) >>>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >>>> at java.net.Socket.connect(Socket.java:589) >>>> at org.apache.flink.streaming.api.functions.source.SocketTextSt >>>> reamFunction.run(SocketTextStreamFunction.java:96) >>>> at org.apache.flink.streaming.api.operators.StreamSource.run(St >>>> reamSource.java:87) >>>> at org.apache.flink.streaming.api.operators.StreamSource.run(St >>>> reamSource.java:56) >>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.ru >>>> n(SourceStreamTask.java:99) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>>> treamTask.java:306) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> On Thu, Jul 19, 2018 at 11:22 PM, vino yang <yanghua1...@gmail.com> >>>> wrote: >>>> >>>>> Hi anna, >>>>> >>>>> Can you share your program and the exception stack trace and more >>>>> details about what's your source and state backend? >>>>> >>>>> From the information you provided, it seems Flink started a network >>>>> connect but timed out. >>>>> >>>>> Thanks, vino. >>>>> >>>>> 2018-07-20 14:14 GMT+08:00 anna stax <annasta...@gmail.com>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am new to Flink. I am using the classes CountWithTimestamp and >>>>>> CountWithTimeoutFunction from the examples found in >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>>>>> dev/stream/operators/process_function.html >>>>>> >>>>>> I am getting the error Exception in thread "main" >>>>>> org.apache.flink.runtime.client.JobExecutionException: >>>>>> java.net.ConnectException: Operation timed out (Connection timed out) >>>>>> >>>>>> Looks like when timer’s time is reached I am getting this error. Any >>>>>> idea why. Please help >>>>>> >>>>>> Thanks >>>>>> >>>>> >>>>> >>>> >>> >> >