Hi, I’m running into a new problem trying to get streaming going. I have a test class that sets up my pipeline and runs it fine. The actual production implementation sets up the pipeline from within an actor. At first, I ran into a bunch of issues relating to the serialization of closures from within the actor, so I externalized the pipeline setup to a separate case class. The test class I mentioned invokes this case class to set up the pipeline. However, when the actor invokes the case class to set up the pipeline, I get the following exception when the pipeline is actually run. Anyone encountered this before?
Alan 14/07/24 22:31:10 [ERROR] Executor: Exception in task ID 1 java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }' at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:412) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1091) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1805) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1704) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1342) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1704) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1342) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:499) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1835) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1794) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) 14/07/24 22:31:10 [WARN] TaskSetManager: Lost TID 1 (task 0.0:1)