Hi,

I started building a custom data source using data source v2 api to stream
data incrementally from azure data lake (HDFS) in Spark 2.4.0. I used kafka
as the reference to implement the
DataSourceV2/MicroBatchReader/InputPartitionReader/InputPartition/OffsetV2
classes and goteverything working to the stage where I could perform the
load operation to load all records from my source and output them to the
console.  But when I started doing aggregation such as
.load.GroupBy("id").count() with my custom data source which involved
keeping state data in HDFS, the job started failing with the following
errors, is there anything I missed to implement in the data source v2 api in
order for the GroupBy/count operation to work? And what is the best way to
debug below further? Thanks.


*Error in executor node: *
19/09/04 19:01:02 INFO CheckpointFileManager: Writing atomically to
wasbs://t...@test.blob.core.windows.net/teststreaming/checkpoint/test/state/0/2/1.delta
using temp file
wasbs://t...@test.blob.core.windows.net/teststreaming/checkpoint/test/state/0/2/.1.delta.0a0b6bc1-1ab5-4c43-8057-014790851cab.TID6.tmp
19/09/04 19:01:02 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
    at
org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.write(NativeAzureFileSystem.java:1119)
    at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at
net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:258)
    at
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
    at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:274)
    at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:508)
    at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
    at
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:65)
    at
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:64)
    at
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
    at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
    at
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
    at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:137)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

*Error in the driver node:*
Logical Plan:
Aggregate [id#0], [id#0, count(1) AS count#4L]
+- Filter NOT (id#0 = )
   +- StreamingExecutionRelation
org.apache.spark.sql.adls.AdlsRecordSourceMicroBatchReader@3f3b8886, [id#0]

        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Writing job aborted.
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
        at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
        at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782)
        at
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782)
        at
org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
        at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
        at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        ... 1 more


Thanks,
Steve



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to