Guys I'm struggling to debug some strange behavior in a simple
Streaming + Java + Kafka example -- in fact, a simplified version of
JavaKafkaWordcount, that is just calling print() on a sequence of
messages.

Data is flowing, but it only appears to work for a few periods --
sometimes 0 -- before ceasing to call any actions. Sorry for lots of
log posting but it may illustrate to someone who knows this better
what is happening:



Key action in the logs seems to be as follows -- it works a few times:

...
2014-05-30 13:53:50 INFO  ReceiverTracker:58 - Stream 0 received 0 blocks
2014-05-30 13:53:50 INFO  JobScheduler:58 - Added jobs for time 1401454430000 ms
-------------------------------------------
Time: 1401454430000 ms
-------------------------------------------

2014-05-30 13:53:50 INFO  JobScheduler:58 - Starting job streaming job
1401454430000 ms.0 from job set of time 1401454430000 ms
2014-05-30 13:53:50 INFO  JobScheduler:58 - Finished job streaming job
1401454430000 ms.0 from job set of time 1401454430000 ms
2014-05-30 13:53:50 INFO  JobScheduler:58 - Total delay: 0.004 s for
time 1401454430000 ms (execution: 0.000 s)
2014-05-30 13:53:50 INFO  MappedRDD:58 - Removing RDD 2 from persistence list
2014-05-30 13:53:50 INFO  BlockManager:58 - Removing RDD 2
2014-05-30 13:53:50 INFO  BlockRDD:58 - Removing RDD 1 from persistence list
2014-05-30 13:53:50 INFO  BlockManager:58 - Removing RDD 1
2014-05-30 13:53:50 INFO  KafkaInputDStream:58 - Removing blocks of
RDD BlockRDD[1] at BlockRDD at ReceiverInputDStream.scala:69 of time
1401454430000 ms
2014-05-30 13:54:00 INFO  ReceiverTracker:58 - Stream 0 received 0 blocks
2014-05-30 13:54:00 INFO  JobScheduler:58 - Added jobs for time 1401454440000 ms
...


Then works with some additional, different output in the logs -- here
you see output is flowing too:

...
2014-05-30 13:54:20 INFO  ReceiverTracker:58 - Stream 0 received 2 blocks
2014-05-30 13:54:20 INFO  JobScheduler:58 - Added jobs for time 1401454460000 ms
2014-05-30 13:54:20 INFO  JobScheduler:58 - Starting job streaming job
1401454460000 ms.0 from job set of time 1401454460000 ms
2014-05-30 13:54:20 INFO  SparkContext:58 - Starting job: take at
DStream.scala:593
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Got job 1 (take at
DStream.scala:593) with 1 output partitions (allowLocal=true)
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Final stage: Stage 1(take
at DStream.scala:593)
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Parents of final stage: List()
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Missing parents: List()
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Computing the requested
partition locally
2014-05-30 13:54:20 INFO  BlockManager:58 - Found block
input-0-1401454458400 locally
2014-05-30 13:54:20 INFO  SparkContext:58 - Job finished: take at
DStream.scala:593, took 0.007007 s
2014-05-30 13:54:20 INFO  SparkContext:58 - Starting job: take at
DStream.scala:593
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Got job 2 (take at
DStream.scala:593) with 1 output partitions (allowLocal=true)
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Final stage: Stage 2(take
at DStream.scala:593)
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Parents of final stage: List()
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Missing parents: List()
2014-05-30 13:54:20 INFO  DAGScheduler:58 - Computing the requested
partition locally
2014-05-30 13:54:20 INFO  BlockManager:58 - Found block
input-0-1401454459400 locally
2014-05-30 13:54:20 INFO  SparkContext:58 - Job finished: take at
DStream.scala:593, took 0.002217 s
-------------------------------------------
Time: 1401454460000 ms
-------------------------------------------
99,true,-0.11342268416043325
17,false,1.6732879882133793
...


Then keeps repeating the following with no more evidence that the
print() action is being called:

...
2014-05-30 13:54:20 INFO  JobScheduler:58 - Finished job streaming job
1401454460000 ms.0 from job set of time 1401454460000 ms
2014-05-30 13:54:20 INFO  MappedRDD:58 - Removing RDD 8 from persistence list
2014-05-30 13:54:20 INFO  JobScheduler:58 - Total delay: 0.019 s for
time 1401454460000 ms (execution: 0.015 s)
2014-05-30 13:54:20 INFO  BlockManager:58 - Removing RDD 8
2014-05-30 13:54:20 INFO  BlockRDD:58 - Removing RDD 7 from persistence list
2014-05-30 13:54:20 INFO  BlockManager:58 - Removing RDD 7
2014-05-30 13:54:20 INFO  KafkaInputDStream:58 - Removing blocks of
RDD BlockRDD[7] at BlockRDD at ReceiverInputDStream.scala:69 of time
1401454460000 ms
2014-05-30 13:54:20 INFO  MemoryStore:58 - ensureFreeSpace(100) called
with curMem=201, maxMem=2290719129
2014-05-30 13:54:20 INFO  MemoryStore:58 - Block input-0-1401454460400
stored as bytes to memory (size 100.0 B, free 2.1 GB)
2014-05-30 13:54:20 INFO  BlockManagerInfo:58 - Added
input-0-1401454460400 in memory on 192.168.1.10:60886 (size: 100.0 B,
free: 2.1 GB)
2014-05-30 13:54:20 INFO  BlockManagerMaster:58 - Updated info of
block input-0-1401454460400
2014-05-30 13:54:20 WARN  BlockManager:70 - Block
input-0-1401454460400 already exists on this machine; not re-adding it
2014-05-30 13:54:20 INFO  BlockGenerator:58 - Pushed block input-0-1401454460400
2014-05-30 13:54:21 INFO  MemoryStore:58 - ensureFreeSpace(100) called
with curMem=301, maxMem=2290719129
2014-05-30 13:54:21 INFO  MemoryStore:58 - Block input-0-1401454461400
stored as bytes to memory (size 100.0 B, free 2.1 GB)
2014-05-30 13:54:21 INFO  BlockManagerInfo:58 - Added
input-0-1401454461400 in memory on 192.168.1.10:60886 (size: 100.0 B,
free: 2.1 GB)
2014-05-30 13:54:21 INFO  BlockManagerMaster:58 - Updated info of
block input-0-1401454461400
2014-05-30 13:54:21 WARN  BlockManager:70 - Block
input-0-1401454461400 already exists on this machine; not re-adding it
2014-05-30 13:54:21 INFO  BlockGenerator:58 - Pushed block input-0-1401454461400
2014-05-30 13:54:22 INFO  MemoryStore:58 - ensureFreeSpace(99) called
with curMem=401, maxMem=2290719129
2014-05-30 13:54:22 INFO  MemoryStore:58 - Block input-0-1401454462400
stored as bytes to memory (size 99.0 B, free 2.1 GB)
2014-05-30 13:54:22 INFO  BlockManagerInfo:58 - Added
input-0-1401454462400 in memory on 192.168.1.10:60886 (size: 99.0 B,
free: 2.1 GB)
...


Occasionally it says:

...
2014-05-30 13:54:30 INFO  ReceiverTracker:58 - Stream 0 received 10 blocks
2014-05-30 13:54:30 INFO  JobScheduler:58 - Added jobs for time 1401454470000 ms
2014-05-30 13:54:30 INFO  JobScheduler:58 - Starting job streaming job
1401454470000 ms.0 from job set of time 1401454470000 ms
2014-05-30 13:54:30 INFO  SparkContext:58 - Starting job: take at
DStream.scala:593
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Got job 3 (take at
DStream.scala:593) with 1 output partitions (allowLocal=true)
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Final stage: Stage 3(take
at DStream.scala:593)
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Parents of final stage: List()
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Missing parents: List()
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Computing the requested
partition locally
2014-05-30 13:54:30 INFO  BlockManager:58 - Found block
input-0-1401454460400 locally
2014-05-30 13:54:30 INFO  SparkContext:58 - Job finished: take at
DStream.scala:593, took 0.003993 s
2014-05-30 13:54:30 INFO  SparkContext:58 - Starting job: take at
DStream.scala:593
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Got job 4 (take at
DStream.scala:593) with 9 output partitions (allowLocal=true)
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Final stage: Stage 4(take
at DStream.scala:593)
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Parents of final stage: List()
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Missing parents: List()
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Submitting Stage 4
(MappedRDD[12] at map at MappedDStream.scala:35), which has no missing
parents
2014-05-30 13:54:30 INFO  DAGScheduler:58 - Submitting 9 missing tasks
from Stage 4 (MappedRDD[12] at map at MappedDStream.scala:35)
2014-05-30 13:54:30 INFO  TaskSchedulerImpl:58 - Adding task set 4.0
with 9 tasks
...


Output is definitely continuing to be written to Kafka; you can even
see that it seems to be acknolwedging that the stream is seeing more
data.

The same happens with operations like saving to file. It looks like
the action is no longer scheduled.

Does that ring any bells? I'm at a loss!

Reply via email to