Hi, I started building a custom data source using data source v2 api to stream data incrementally from azure data lake (HDFS) in Spark 2.4.0. I used kafka as the reference to implement the DataSourceV2/MicroBatchReader/InputPartitionReader/InputPartition/OffsetV2 classes and goteverything working to the stage where I could perform the load operation to load all records from my source and output them to the console. But when I started doing aggregation such as .load.GroupBy("id").count() with my custom data source which involved keeping state data in HDFS, the job started failing with the following errors, is there anything I missed to implement in the data source v2 api in order for the GroupBy/count operation to work? And what is the best way to debug below further? Thanks.
*Error in executor node: * 19/09/04 19:01:02 INFO CheckpointFileManager: Writing atomically to wasbs://t...@test.blob.core.windows.net/teststreaming/checkpoint/test/state/0/2/1.delta using temp file wasbs://t...@test.blob.core.windows.net/teststreaming/checkpoint/test/state/0/2/.1.delta.0a0b6bc1-1ab5-4c43-8057-014790851cab.TID6.tmp 19/09/04 19:01:02 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.write(NativeAzureFileSystem.java:1119) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) at java.io.DataOutputStream.write(DataOutputStream.java:107) at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:258) at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190) at java.io.FilterOutputStream.close(FilterOutputStream.java:159) at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303) at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:274) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:508) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:65) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:64) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116) at org.apache.spark.scheduler.Task.run(Task.scala:137) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) *Error in the driver node:* Logical Plan: Aggregate [id#0], [id#0, count(1) AS count#4L] +- Filter NOT (id#0 = ) +- StreamingExecutionRelation org.apache.spark.sql.adls.AdlsRecordSourceMicroBatchReader@3f3b8886, [id#0] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) ... 1 more Thanks, Steve -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org