[
https://issues.apache.org/jira/browse/SPARK-16409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408894#comment-15408894
]
Sean Owen commented on SPARK-16409:
-----------------------------------
It's not quite my area, but I might know the answer. As to occurring
"randomly", I suspect you mean that your example causes it immediately but in a
large program, where an action isn't invoked until much later, it would execute
and manifest when this statement was executed. That's normal for Spark
programs, given the architecture.
In this case the matching group is missing. Looking at this implementation, it
seems like this could be a problem already in
{code}
override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
if (!p.equals(lastRegex)) {
// regex value changed
lastRegex = p.asInstanceOf[UTF8String].clone()
pattern = Pattern.compile(lastRegex.toString)
}
val m = pattern.matcher(s.toString)
if (m.find) {
val mr: MatchResult = m.toMatchResult
UTF8String.fromString(mr.group(r.asInstanceOf[Int]))
} else {
UTF8String.EMPTY_UTF8
}
}
{code}
mr.group() returns null in this case and so the whole method does. Seems like
it's not supposed to do that. I'll table a PR?
> regexp_extract with optional groups causes NPE
> ----------------------------------------------
>
> Key: SPARK-16409
> URL: https://issues.apache.org/jira/browse/SPARK-16409
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0
> Reporter: Max Moroz
>
> df = sqlContext.createDataFrame([['aaaac']], ['s'])
> df.select(F.regexp_extract('s', r'(a+)(b)?(c)', 2)).collect()
> causes NPE. Worse, in a large program it doesn't cause NPE instantly; it
> actually works fine, until some unpredictable (and inconsistent) moment in
> the future when (presumably) the invalid memory access occurs, and then it
> fails. For this reason, it took several hours to debug this.
> Suggestion: either fill the group with null; or raise exception immediately
> after examining the argument with a message that optional groups are not
> allowed.
> Traceback:
> ---------------------------------------------------------------------------
> Py4JJavaError Traceback (most recent call last)
> <ipython-input-8-825292b569fc> in <module>()
> ----> 1 df.select(F.regexp_extract('s', r'(a+)(b)?(c)', 2)).collect()
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\pyspark\sql\dataframe.py
> in collect(self)
> 294 """
> 295 with SCCallSiteSync(self._sc) as css:
> --> 296 port = self._jdf.collectToPython()
> 297 return list(_load_from_socket(port,
> BatchedSerializer(PickleSerializer())))
> 298
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.1-src.zip\py4j\java_gateway.py
> in __call__(self, *args)
> 931 answer = self.gateway_client.send_command(command)
> 932 return_value = get_return_value(
> --> 933 answer, self.gateway_client, self.target_id, self.name)
> 934
> 935 for temp_arg in temp_args:
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\pyspark\sql\utils.py
> in deco(*a, **kw)
> 55 def deco(*a, **kw):
> 56 try:
> ---> 57 return f(*a, **kw)
> 58 except py4j.protocol.Py4JJavaError as e:
> 59 s = e.java_exception.toString()
> C:\Users\me\Downloads\spark-2.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.1-src.zip\py4j\protocol.py
> in get_return_value(answer, gateway_client, target_id, name)
> 310 raise Py4JJavaError(
> 311 "An error occurred while calling {0}{1}{2}.\n".
> --> 312 format(target_id, ".", name), value)
> 313 else:
> 314 raise Py4JError(
> Py4JJavaError: An error occurred while calling o51.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
> (TID 0, localhost): java.lang.NullPointerException
> at
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1863)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1876)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1889)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:883)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:882)
> at
> org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2417)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2417)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2417)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
> at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2416)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:883)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1889)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]