Hello all, Am using spark-2.3.0 and hadoop-2.7.4. I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client.
Read events from Kafka as shown below; m_oKafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "100000") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events")) .select("events.*"); Checkpoint is used as shown below; DataStreamWriter<Row> oMilestoneStream = oAggregation .writeStream() .queryName(strQueryName) .outputMode("update") .trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval())) .option("checkpointLocation", strCheckpointLocation) .foreach(oForeachWriter); strCheckpointLocation is something like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj. This is hdfs location. With this when I redeploy the spark I get below said exception. The only work around I have currently is to delete the checkpoint location and recreate the topic. I also see couple of JIRA tasks which says RESOLVED but the problem still seen. https://issues.apache.org/jira/browse/SPARK-20894 https://issues.apache.org/jira/browse/SPARK-22262 Can someone help me on what is the best solution for this? thanks, Robin Kuttaiah Exception ------- 18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalStateException: Error reading delta file hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta of HDFSStateStoreProvider[id = (op=1,part=0),dir = hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]: hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org $apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196) at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) Caused by: java.io.FileNotFoundException: File does not exist: /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1836) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1808) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1723) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method)