Hey Jeremiah, It looks like TransactionalStateTaskStorageManager.removeOldCheckpoints is relying on an assumption that the store directory actually has some contents, which may not be true depending on the type of store being used.
Can you let me know what kind of KeyValueStore you've configured for your state? Brett On Thu, Jan 23, 2020 at 2:31 PM Prateek Maheshwari <prateek...@gmail.com> wrote: > Brett, can you take a look at this? > > - Prateek > > On Wed, Jan 15, 2020 at 9:41 AM Jeremiah Adams > <jad...@helixeducation.com.invalid> wrote: > >> I am updating our jobs to use samza 1.3.0. I'm getting a null pointer >> when manually committing via taskCoordinator.commit(). >> >> >> Below is the stack trace - can anyone point me in the right direction? >> >> Thanks. >> >> >> 2020-01-15 10:33:35 RunLoop [ERROR] Task Partition 0 commit failed >> java.lang.NullPointerException >> at >> scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) >> at >> scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) >> at >> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) >> at >> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) >> at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager$$anonfun$removeOldCheckpoints$2.apply(TransactionalStateTaskStorageManager.scala:94) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager$$anonfun$removeOldCheckpoints$2.apply(TransactionalStateTaskStorageManager.scala:86) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager.removeOldCheckpoints(TransactionalStateTaskStorageManager.scala:86) >> at >> org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:277) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker$5.run(RunLoop.java:547) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.commit(RunLoop.java:566) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.run(RunLoop.java:432) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.access$300(RunLoop.java:357) >> at org.apache.samza.container.RunLoop.runTasks(RunLoop.java:244) >> at org.apache.samza.container.RunLoop.run(RunLoop.java:176) >> at >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:768) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) >> at java.util.concurrent.FutureTask.run(FutureTask.java) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> 2020-01-15 10:33:35 RunLoop [ERROR] Caught throwable and stopping run loop >> java.lang.NullPointerException >> at >> scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) >> at >> scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) >> at >> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) >> at >> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) >> at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager$$anonfun$removeOldCheckpoints$2.apply(TransactionalStateTaskStorageManager.scala:94) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager$$anonfun$removeOldCheckpoints$2.apply(TransactionalStateTaskStorageManager.scala:86) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >> at >> org.apache.samza.storage.TransactionalStateTaskStorageManager.removeOldCheckpoints(TransactionalStateTaskStorageManager.scala:86) >> at >> org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:277) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker$5.run(RunLoop.java:547) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.commit(RunLoop.java:566) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.run(RunLoop.java:432) >> at >> org.apache.samza.container.RunLoop$AsyncTaskWorker.access$300(RunLoop.java:357) >> at org.apache.samza.container.RunLoop.runTasks(RunLoop.java:244) >> at org.apache.samza.container.RunLoop.run(RunLoop.java:176) >> at >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:768) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) >> at java.util.concurrent.FutureTask.run(FutureTask.java) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> >> Jeremiah Adams >> Software Engineer >> www.helixeducation.com<http://www.helixeducation.com/> >> Blog<http://www.helixeducation.com/blog/> | Twitter< >> https://twitter.com/HelixEducation> | Facebook< >> https://www.facebook.com/HelixEducation> | LinkedIn< >> http://www.linkedin.com/company/3609946> >> >