Hi,
Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.
I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.
The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.
But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.
I would be very appreciative for any direction and I can happily provide
more detail.
Thanks,
Conor
15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407