This still seems to be broken. In 1.1.1, it errors immediately on this line (from the above repro script):
liveTweets.map(t => noop(t)).print() The stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27) at $iwC$$iwC$$iwC.<init>(<console>:32) at $iwC$$iwC.<init>(<console>:34) at $iwC.<init>(<console>:36) at <init>(<console>:38) at .<init>(<console>:42) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 43 more TD, any troubleshooting tips? Nick On Thu Jul 24 2014 at 8:55:07 PM Nicholas Chammas < nicholas.cham...@gmail.com> wrote: > Yep, here goes! > > Here are my environment vitals: > > - Spark 1.0.0 > - EC2 cluster with 1 slave spun up using spark-ec2 > - twitter4j 3.0.3 > - spark-shell called with --jars argument to load > spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j > jars. > > Now, while I’m in the Spark shell, I enter the following: > > import twitter4j.auth.{Authorization, OAuthAuthorization} > import twitter4j.conf.ConfigurationBuilder > import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext} > import org.apache.spark.streaming.twitter.TwitterUtils > def getAuth(): Option[Authorization] = { > > System.setProperty("twitter4j.oauth.consumerKey", "consumerKey") > System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret") > System.setProperty("twitter4j.oauth.accessToken", "accessToken") > System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") > > Some(new OAuthAuthorization(new ConfigurationBuilder().build())) > > } > def noop(a: Any): Any = { > a > } > val ssc = new StreamingContext(sc, Seconds(5)) > val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth()) > val liveTweets = liveTweetObjects.map(_.getText) > > liveTweets.map(t => noop(t)).print() > > ssc.start() > > So basically, I’m just printing Tweets as-is, but first I’m mapping them > to themselves via noop(). The Tweets will start to flow just fine for a > minute or so, and then, this: > > 14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job > 1406243610000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task not > serializable: java.io.NotSerializableException: > org.apache.spark.streaming.StreamingContext > at > [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at > [org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > The time-to-first-error is variable. > > This is the simplest repro I can show at this time. Doing more complex > things with liveTweets that involve a KMeansModel, for example, will be > interrupted quicker by this java.io.NotSerializableException. I don’t > know if the root cause is the same, but the error certainly is. > > By the way, trying to reproduce this on 1.0.1 doesn’t raise the same > error, but I can’t dig deeper to make sure this is really resolved (e.g. by > trying more complex things that need data) due to SPARK-2471 > <https://issues.apache.org/jira/browse/SPARK-2471>. I see that that issue > has been resolved, so I’ll try this whole process again using the latest > from master and see how it goes. > > Nick > > > On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > I am very curious though. Can you post a concise code example which we can >> run to reproduce this problem? >> >> TD >> >> >> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> I am not entire sure off the top of my head. But a possible (usually >>> works) workaround is to define the function as a val instead of a def. For >>> example >>> >>> def func(i: Int): Boolean = { true } >>> >>> can be written as >>> >>> val func = (i: Int) => { true } >>> >>> Hope this helps for now. >>> >>> TD >>> >>> >>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas < >>> nicholas.cham...@gmail.com> wrote: >>> >>>> Hey Diana, >>>> >>>> Did you ever figure this out? >>>> >>>> I’m running into the same exception, except in my case the function I’m >>>> calling is a KMeans model.predict(). >>>> >>>> In regular Spark it works, and Spark Streaming without the call to >>>> model.predict() also works, but when put together I get this >>>> serialization exception. I’m on 1.0.0. >>>> >>>> Nick >>>> >>>> >>>> >>>> On Thu, May 8, 2014 at 6:37 AM, Diana Carroll <dcarr...@cloudera.com> >>>> wrote: >>>> >>>>> Hey all, trying to set up a pretty simple streaming app and getting >>>>> some weird behavior. >>>>> >>>>> First, a non-streaming job that works fine: I'm trying to pull out >>>>> lines of a log file that match a regex, for which I've set up a function: >>>>> >>>>> def getRequestDoc(s: String): >>>>> String = { "KBDOC-[0-9]*".r.findFirstIn(s).orNull } >>>>> logs=sc.textFile(logfiles) >>>>> logs.map(getRequestDoc).take(10) >>>>> >>>>> That works, but I want to run that on the same data, but streaming, so >>>>> I tried this: >>>>> >>>>> val logs = ssc.socketTextStream("localhost",4444) >>>>> logs.map(getRequestDoc).print() >>>>> ssc.start() >>>>> >>>>> From this code, I get: >>>>> 14/05/08 03:32:08 ERROR JobScheduler: Error running job streaming job >>>>> 1399545128000 ms.0 >>>>> org.apache.spark.SparkException: Job aborted: Task not serializable: >>>>> java.io.NotSerializableException: >>>>> org.apache.spark.streaming.StreamingContext >>>>> >>>>> >>>>> But if I do the map function inline instead of calling a separate >>>>> function, it works: >>>>> >>>>> logs.map("KBDOC-[0-9]*".r.findFirstIn(_).orNull).print() >>>>> >>>>> So why is it able to serialize my little function in regular spark, >>>>> but not in streaming? >>>>> >>>>> Thanks, >>>>> Diana >>>>> >>>>> >>>>> >>>> >>> >> >