could the problem be as simple as var active being never true?

On 04.03.2016 03:08, shikhar wrote:
I am trying to have my job also run a periodic action by using a custom
source that emits a dummy element periodically and a sink that executes the
callback, as shown in the code below. However as soon as I start the job and
check the state in the JobManager UI this particular Sink->Source combo is
in state 'FINISHED' I know based on logging that the sink never received any
elements. What am I doing wrong?

```scala
     env
       .addSource(PeriodicSource(1.minutes))
       .addSink { _ => foo() }
```

```scala
import org.apache.flink.streaming.api.functions.source.SourceFunction
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.concurrent.duration.FiniteDuration

case class PeriodicSource(interval: FiniteDuration) extends
SourceFunction[Unit] {
   @volatile private var active = false

   override def run(ctx: SourceContext[Unit]): Unit = {
     while (active) {
       sleep()
       if (active) {
         ctx.getCheckpointLock
         ctx.collect(Unit)
       }
     }
   }

   override def cancel(): Unit = {
     active = false
   }

   private def sleep(): Unit = {
     val startTimeMs = System.currentTimeMillis()
     val desiredSleepMs = interval.toMillis
     do {
       Thread.sleep(math.min(desiredSleepMs, 100))
     } while (active && (System.currentTimeMillis() - startTimeMs) <
desiredSleepMs)
   }
}
```



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Reply via email to