Shixiong,

Thanks, interesting point. So if we want to only process one batch then
terminate the consumer, what's the best way to achieve that? Presumably the
listener could set a flag on the driver notifying it that it can terminate.
But the driver is not in a loop, it's basically blocked in
awaitTermination.  So what would be a way to trigger the termination in the
driver?

"context.awaitTermination() allows the current thread to wait for the
termination of a context by stop() or by an exception" - presumably, we
need to call stop() somewhere or perhaps throw.

Cheers,
- Dmitry

On Thu, Jun 4, 2015 at 3:55 AM, Shixiong Zhu <zsxw...@gmail.com> wrote:

> You should not call `jssc.stop(true);` in a StreamingListener. It will
> cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But
> since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-06-04 0:39 GMT+08:00 dgoldenberg <dgoldenberg...@gmail.com>:
>
>> Hi,
>>
>> I've got a Spark Streaming driver job implemented and in it, I register a
>> streaming listener, like so:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>    Durations.milliseconds(params.getBatchDurationMillis()));
>> jssc.addStreamingListener(new JobListener(jssc));
>>
>> where JobListener is defined like so
>>         private static class JobListener implements StreamingListener {
>>
>>                 private JavaStreamingContext jssc;
>>
>>                 JobListener(JavaStreamingContext jssc) {
>>                         this.jssc = jssc;
>>                 }
>>
>>                 @Override
>>                 public void
>> onBatchCompleted(StreamingListenerBatchCompleted
>> batchCompleted) {
>>                         System.out.println(">> Batch completed.");
>>                         jssc.stop(true);
>>                         System.out.println(">> The job has been
>> stopped.");
>>                 }
>> ........................
>>
>> I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
>> something wrong?
>>
>> In this particular case, I was trying to implement a bulk ingest type of
>> logic where the first batch is all we're interested in (reading out of a
>> Kafka topic with offset reset set to "smallest").
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to