Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream.
I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs to Cassandra and outputting to kafka. I am also saving the offsets to Cassandra. But unfortunately I am sporadically getting the error below. It recovers and continues but gives a large spike in the processing delay. And it can happen in every 3 or 4 batches. I still have other receiver jobs running and they never throw these exceptions. I would be very appreciative for any direction and I can happily provide more detail. Thanks, Conor 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not found, computing it 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events, partition 0 offsets 13630747 -> 13633001 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id is overridden to 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in stage 654.0 (TID 5242) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5243 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage 654.0 (TID 5243) --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org