My object name is  CreateUserNotificationRequests, thats why  you see
CreateUserNotificationRequests
in the Error message.
I edited the object name after pasting the code...Hope there is no
confusion and I get some help.
Thanks



On Fri, Jul 20, 2018 at 10:10 AM, anna stax <annasta...@gmail.com> wrote:

> Hello all,
>
> This is my code, just trying to make the code example in
> https://ci.apache.org/projects/flink/flink-docs-release-
> 1.5/dev/stream/operators/process_function.html work
>
> object ProcessFunctionTest {
>
>   def main(args: Array[String]) {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     val text = env.socketTextStream("localhost", 9999)
>
>     val text1 = text.map(s => (s,s)).keyBy(0).process(new
> CountWithTimeoutFunction())
>
>     text1.print()
>
>     env.execute("CountWithTimeoutFunction")
>   }
>
>   case class CountWithTimestamp(key: String, count: Long, lastModified:
> Long)
>
>   class CountWithTimeoutFunction extends ProcessFunction[(String, String),
> (String, Long)] {
>
>     lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
>       .getState(new ValueStateDescriptor[CountWithTimestamp]("myState",
> classOf[CountWithTimestamp]))
>
>     override def processElement(value: (String, String), ctx:
> ProcessFunction[(String, String), (String, Long)]#Context, out:
> Collector[(String, Long)]): Unit = {
>     ......
>     }
>
>     override def onTimer(timestamp: Long, ctx: ProcessFunction[(String,
> String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]):
> Unit = {
>     .......
>     }
>   }
> }
>
>
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException:
> java.net.ConnectException: Operation timed out (Connection timed out)
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(
> MiniCluster.java:625)
> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.
> execute(LocalStreamEnvironment.java:121)
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:654)
> at com.whil.flink.streaming.CreateUserNotificationRequests$.main(
> CreateUserNotificationRequests.scala:42)
> at com.whil.flink.streaming.CreateUserNotificationRequests.main(
> CreateUserNotificationRequests.scala)
> Caused by: java.net.ConnectException: Operation timed out (Connection
> timed out)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.net.AbstractPlainSocketImpl.doConnect(
> AbstractPlainSocketImpl.java:350)
> at java.net.AbstractPlainSocketImpl.connectToAddress(
> AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:
> 188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at org.apache.flink.streaming.api.functions.source.
> SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:87)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:56)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:99)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
>
> On Thu, Jul 19, 2018 at 11:22 PM, vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi anna,
>>
>> Can you share your program and the exception stack trace and more details
>> about what's your source and state backend?
>>
>> From the information you provided, it seems Flink started a network
>> connect but timed out.
>>
>> Thanks, vino.
>>
>> 2018-07-20 14:14 GMT+08:00 anna stax <annasta...@gmail.com>:
>>
>>> Hi all,
>>>
>>> I am new to Flink. I am using the classes CountWithTimestamp and
>>> CountWithTimeoutFunction from the examples found in
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>>> dev/stream/operators/process_function.html
>>>
>>> I am getting the error Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException:
>>> java.net.ConnectException: Operation timed out (Connection timed out)
>>>
>>> Looks like when timer’s time is reached I am getting this error. Any
>>> idea why. Please help
>>>
>>> Thanks
>>>
>>
>>
>

Reply via email to