Hello all,

This is my code, just trying to make the code example in
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
operators/process_function.html work

object ProcessFunctionTest {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val text = env.socketTextStream("localhost", 9999)

    val text1 = text.map(s => (s,s)).keyBy(0).process(new
CountWithTimeoutFunction())

    text1.print()

    env.execute("CountWithTimeoutFunction")
  }

  case class CountWithTimestamp(key: String, count: Long, lastModified:
Long)

  class CountWithTimeoutFunction extends ProcessFunction[(String, String),
(String, Long)] {

    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
      .getState(new ValueStateDescriptor[CountWithTimestamp]("myState",
classOf[CountWithTimestamp]))

    override def processElement(value: (String, String), ctx:
ProcessFunction[(String, String), (String, Long)]#Context, out:
Collector[(String, Long)]): Unit = {
    ......
    }

    override def onTimer(timestamp: Long, ctx: ProcessFunction[(String,
String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]):
Unit = {
    .......
    }
  }
}


Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
java.net.ConnectException: Operation timed out (Connection timed out)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
com.whil.flink.streaming.CreateUserNotificationRequests$.main(CreateUserNotificationRequests.scala:42)
at
com.whil.flink.streaming.CreateUserNotificationRequests.main(CreateUserNotificationRequests.scala)
Caused by: java.net.ConnectException: Operation timed out (Connection timed
out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)

On Thu, Jul 19, 2018 at 11:22 PM, vino yang <yanghua1...@gmail.com> wrote:

> Hi anna,
>
> Can you share your program and the exception stack trace and more details
> about what's your source and state backend?
>
> From the information you provided, it seems Flink started a network
> connect but timed out.
>
> Thanks, vino.
>
> 2018-07-20 14:14 GMT+08:00 anna stax <annasta...@gmail.com>:
>
>> Hi all,
>>
>> I am new to Flink. I am using the classes CountWithTimestamp and
>> CountWithTimeoutFunction from the examples found in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>> dev/stream/operators/process_function.html
>>
>> I am getting the error Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException:
>> java.net.ConnectException: Operation timed out (Connection timed out)
>>
>> Looks like when timer’s time is reached I am getting this error. Any idea
>> why. Please help
>>
>> Thanks
>>
>
>

Reply via email to