Shixiong, Thanks, interesting point. So if we want to only process one batch then terminate the consumer, what's the best way to achieve that? Presumably the listener could set a flag on the driver notifying it that it can terminate. But the driver is not in a loop, it's basically blocked in awaitTermination. So what would be a way to trigger the termination in the driver?
"context.awaitTermination() allows the current thread to wait for the termination of a context by stop() or by an exception" - presumably, we need to call stop() somewhere or perhaps throw. Cheers, - Dmitry On Thu, Jun 4, 2015 at 3:55 AM, Shixiong Zhu <zsxw...@gmail.com> wrote: > You should not call `jssc.stop(true);` in a StreamingListener. It will > cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But > since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit. > > Best Regards, > Shixiong Zhu > > 2015-06-04 0:39 GMT+08:00 dgoldenberg <dgoldenberg...@gmail.com>: > >> Hi, >> >> I've got a Spark Streaming driver job implemented and in it, I register a >> streaming listener, like so: >> >> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >> Durations.milliseconds(params.getBatchDurationMillis())); >> jssc.addStreamingListener(new JobListener(jssc)); >> >> where JobListener is defined like so >> private static class JobListener implements StreamingListener { >> >> private JavaStreamingContext jssc; >> >> JobListener(JavaStreamingContext jssc) { >> this.jssc = jssc; >> } >> >> @Override >> public void >> onBatchCompleted(StreamingListenerBatchCompleted >> batchCompleted) { >> System.out.println(">> Batch completed."); >> jssc.stop(true); >> System.out.println(">> The job has been >> stopped."); >> } >> ........................ >> >> I do not seem to be seeing onBatchCompleted being triggered. Am I doing >> something wrong? >> >> In this particular case, I was trying to implement a bulk ingest type of >> logic where the first batch is all we're interested in (reading out of a >> Kafka topic with offset reset set to "smallest"). >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >