+ Xinyu

> On Jan 4, 2019, at 9:58 PM, Deshpande, Omkar <omkar_deshpa...@intuit.com> 
> wrote:
> 
> Hello,
> 
> I am getting following exception while running Beam Samza Runner –
> 
> java.lang.UnsupportedOperationException: Cannot create a producer for an 
> input system
> 
>      at 
> org.apache.beam.runners.samza.adapter.BoundedSourceSystem$Factory.getProducer(BoundedSourceSystem.java:411)
> 
>      at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:223)
> 
>      at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:220)
> 
>      at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>      at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>      at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> 
>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 
>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>      at 
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:220)
> 
>      at org.apache.samza.container.SamzaContainer.apply(SamzaContainer.scala)
> 
>      at 
> org.apache.samza.processor.StreamProcessor.createSamzaContainer(StreamProcessor.java:198)
> 
>      at 
> org.apache.samza.processor.StreamProcessor$1.onNewJobModel(StreamProcessor.java:290)
> 
>      at 
> org.apache.samza.zk.ZkJobCoordinator.onNewJobModelConfirmed(ZkJobCoordinator.java:304)
> 
>      at 
> org.apache.samza.zk.ZkJobCoordinator$ZkBarrierListenerImpl.lambda$onBarrierStateChanged$1(ZkJobCoordinator.java:394)
> 
>      at 
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163)
> 
>      at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>      at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 
>      at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 
>      at java.lang.Thread.run(Thread.java:748)
> 
> 2019-01-04 21:39:11 ERROR SamzaContainer$:86 - Failed to create a producer 
> for 0-KafkaIO_Read_Read_KafkaUnboundedSource__out__PCollection_, so skipping.
> 
> java.lang.UnsupportedOperationException: Cannot create a producer for an 
> input system
> 
>      at 
> org.apache.beam.runners.samza.adapter.UnboundedSourceSystem$Factory.getProducer(UnboundedSourceSystem.java:452)
> 
>      at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:223)
> 
>      at 
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:220)
> 
>      at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>      at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 
>      at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
> 
>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 
>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>      at 
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:220)
> 
>      at org.apache.samza.container.SamzaContainer.apply(SamzaContainer.scala)
> 
>      at 
> org.apache.samza.processor.StreamProcessor.createSamzaContainer(StreamProcessor.java:198)
> 
>      at 
> org.apache.samza.processor.StreamProcessor$1.onNewJobModel(StreamProcessor.java:290)
> 
>      at 
> org.apache.samza.zk.ZkJobCoordinator.onNewJobModelConfirmed(ZkJobCoordinator.java:304)
> 
>      at 
> org.apache.samza.zk.ZkJobCoordinator$ZkBarrierListenerImpl.lambda$onBarrierStateChanged$1(ZkJobCoordinator.java:394)
> 
>      at 
> org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:163)
> 
>      at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>      at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 
>      at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 
>      at java.lang.Thread.run(Thread.java:748)
> 
> This exception does not stop the execution, however I would like to 
> understand the reason for this and possible resolution.
> 
> Thanks,
> Omkar Deshpande

Reply via email to