My object name is CreateUserNotificationRequests, thats why you see CreateUserNotificationRequests in the Error message. I edited the object name after pasting the code...Hope there is no confusion and I get some help. Thanks
On Fri, Jul 20, 2018 at 10:10 AM, anna stax <annasta...@gmail.com> wrote: > Hello all, > > This is my code, just trying to make the code example in > https://ci.apache.org/projects/flink/flink-docs-release- > 1.5/dev/stream/operators/process_function.html work > > object ProcessFunctionTest { > > def main(args: Array[String]) { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > val text = env.socketTextStream("localhost", 9999) > > val text1 = text.map(s => (s,s)).keyBy(0).process(new > CountWithTimeoutFunction()) > > text1.print() > > env.execute("CountWithTimeoutFunction") > } > > case class CountWithTimestamp(key: String, count: Long, lastModified: > Long) > > class CountWithTimeoutFunction extends ProcessFunction[(String, String), > (String, Long)] { > > lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext > .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", > classOf[CountWithTimestamp])) > > override def processElement(value: (String, String), ctx: > ProcessFunction[(String, String), (String, Long)]#Context, out: > Collector[(String, Long)]): Unit = { > ...... > } > > override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, > String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): > Unit = { > ....... > } > } > } > > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.net.ConnectException: Operation timed out (Connection timed out) > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking( > MiniCluster.java:625) > at org.apache.flink.streaming.api.environment.LocalStreamEnvironment. > execute(LocalStreamEnvironment.java:121) > at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. > execute(StreamExecutionEnvironment.scala:654) > at com.whil.flink.streaming.CreateUserNotificationRequests$.main( > CreateUserNotificationRequests.scala:42) > at com.whil.flink.streaming.CreateUserNotificationRequests.main( > CreateUserNotificationRequests.scala) > Caused by: java.net.ConnectException: Operation timed out (Connection > timed out) > at java.net.PlainSocketImpl.socketConnect(Native Method) > at java.net.AbstractPlainSocketImpl.doConnect( > AbstractPlainSocketImpl.java:350) > at java.net.AbstractPlainSocketImpl.connectToAddress( > AbstractPlainSocketImpl.java:206) > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java: > 188) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:589) > at org.apache.flink.streaming.api.functions.source. > SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) > at org.apache.flink.streaming.api.operators.StreamSource. > run(StreamSource.java:87) > at org.apache.flink.streaming.api.operators.StreamSource. > run(StreamSource.java:56) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run( > SourceStreamTask.java:99) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > > On Thu, Jul 19, 2018 at 11:22 PM, vino yang <yanghua1...@gmail.com> wrote: > >> Hi anna, >> >> Can you share your program and the exception stack trace and more details >> about what's your source and state backend? >> >> From the information you provided, it seems Flink started a network >> connect but timed out. >> >> Thanks, vino. >> >> 2018-07-20 14:14 GMT+08:00 anna stax <annasta...@gmail.com>: >> >>> Hi all, >>> >>> I am new to Flink. I am using the classes CountWithTimestamp and >>> CountWithTimeoutFunction from the examples found in >>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>> dev/stream/operators/process_function.html >>> >>> I am getting the error Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: >>> java.net.ConnectException: Operation timed out (Connection timed out) >>> >>> Looks like when timer’s time is reached I am getting this error. Any >>> idea why. Please help >>> >>> Thanks >>> >> >> >