Hi Bharath, Your theory is correct, InfluxDBFactory just return null for SystemAdmin. Now it’s fixed already! But another problem occurred, below is stack trace:
2019-06-03 17:26:11.176 [main] KafkaSystemAdmin [INFO] Fetching SystemStreamMetadata for topics [__samza_coordinator_canal-metrics-test_1] on system kafka 2019-06-03 17:26:11.179 [main] KafkaSystemAdmin [ERROR] Fetching system stream metadata for: [__samza_coordinator_canal-metrics-test_1] threw an exception. java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253) at kafka.utils.Logging.logger(Logging.scala:43) at kafka.utils.Logging.debug(Logging.scala:62) at kafka.utils.Logging.debug$(Logging.scala:62) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala) at org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461) at org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76) at org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340) at org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90) at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374) at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292) at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109) at org.apache.samza.job.JobRunner.run(JobRunner.scala:105) at org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73) at org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) at org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) Exception in thread "main" org.apache.samza.SamzaException: Failed to run application at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) at org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) at org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) Caused by: org.apache.samza.SamzaException: java.lang.NullPointerException at org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:358) at org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:347) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:97) at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374) at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292) at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109) at org.apache.samza.job.JobRunner.run(JobRunner.scala:105) at org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73) ... 2 more Caused by: java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253) at kafka.utils.Logging.logger(Logging.scala:43) at kafka.utils.Logging.debug(Logging.scala:62) at kafka.utils.Logging.debug$(Logging.scala:62) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74) at org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala) at org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461) at org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76) at org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340) at org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90) ... 9 more log4j.xml can be found in lib and its content: <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender"> <param name="File" value="${samza.log.dir}/${samza.container.name}.log"/> <param name="MaxFileSize" value="10MB"/> <param name="MaxBackupIndex" value="10"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n"/> </layout> </appender> <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender"> <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log"/> <param name="MaxFileSize" value="10MB"/> <param name="MaxBackupIndex" value="10"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n"/> </layout> </appender> <logger name="STARTUP_LOGGER" additivity="false"> <level value="info"/> <appender-ref ref="StartupAppender"/> </logger> <root> <priority value="info"/> <appender-ref ref="RollingAppender"/> </root> </log4j:configuration> Thanks for you help! Qi Shu > 在 2019年5月31日,上午4:59,Bharath Kumara Subramanian <codin.mart...@gmail.com> 写道: > > Here is my theory based on the stack trace and logs. > InfluxDBSystemFactory could not create the admin successfully and returned > a null. Inside JavaSystemConfig, when it tries to get all the admins for > the systems available, Collectors.toMap(..) throws an exception during > reducing due to a null value. > > Few questions, > 1. Can getAdmin(...) return a null? > 2. Can you add some logs to your getAdmin API to see if they are printed > right before the exception? > > I am guessing here since I don't have access to the source code of > *InfluxDBSystemFactory*. If you can share the source code of > InfluxDBSystemFactory, it will help me confirm my theory. > > Let me know how it goes. > > Thanks, > Bharath > > On Wed, May 29, 2019 at 7:01 PM QiShu <sh...@eefung.com> wrote: > >> Hi Bharath, >> >> Below is configuration: >> # App >> app.name=canal-metrics >> app.id=test >> app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask >> task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask >> >> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> job.default.system=kafka >> >> job.container.thread.pool.size=0 >> task.max.concurrency=1 >> >> task.opts=-Xmx1843m >> >> task.checkpoint.replication.factor=3 >> >> >> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >> >> # Checkpointing >> >> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory >> >> # Kafka >> systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183 >> systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096, >> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096 >> >> # Systems & Streams >> >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >> systems.kafka.default.stream.samza.key.serde=string >> systems.kafka.default.stream.samza.msg.serde=string >> >> >> systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory >> systems.hstore.default.stream.samza.key.serde=string >> systems.hstore.default.stream.samza.msg.serde=string >> >> task.inputs=kafka.samza-metrics >> >> task.consumer.batch.size=100 >> >> # Deployment >> yarn.package.path=hdfs:// >> p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz >> yarn.container.count=1 >> cluster-manager.container.memory.mb=2048 >> >> >> Below is entire log: >> added manifest >> java version "1.8.0_151" >> Java(TM) SE Runtime Environment (build 1.8.0_151-b12) >> Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode) >> /usr/java/jdk1.8.0_151/bin/java >> -Dlog4j.configuration=file:bin/log4j-console.xml >> -DisThreadContextMapInheritable=true >> -Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal >> -Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp >> -Xmx768M -XX:+PrintGCDateStamps >> -Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log >> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 >> -XX:GCLogFileSize=10241024 -d64 -cp /etc/hadoop/conf:pathing.jar >> org.apache.samza.runtime.ApplicationRunnerMain >> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory >> --config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties >> 2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name: >> com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask >> 2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this >> run is 1559181550904-ec4b9a69 >> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined, >> generating job.name equal to app.name value: canal-metrics >> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined, >> generating job.id equal to app.name value: test >> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined, >> generating job.name equal to app.name value: canal-metrics >> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined, >> generating job.id equal to app.name value: test >> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset >> reset value for KafkaConsumer for system upcoming converted from >> latest(samza) to {} >> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting >> auto.offset.reset for system kafka to latest >> 2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key >> serialization for the consumer(for system kafka) to ByteArrayDeserializer >> 2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value >> serialization for the consumer(for system kafka) to ByteArrayDeserializer >> 2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating >> KafkaConsumer for systemName kafka with properties >> {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, >> value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, >> enable.auto.commit=false, max.poll.records=100, >> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, >> group.id=canal-metrics-test, >> partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, >> bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096, >> 10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, >> auto.offset.reset=latest, client.id >> =kafka_admin_consumer-canal_metrics-test} >> 2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values: >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, >> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] >> check.crcs = true >> client.id = kafka_admin_consumer-canal_metrics-test >> connections.max.idle.ms = 540000 >> enable.auto.commit = false >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = canal-metrics-test >> heartbeat.interval.ms = 3000 >> interceptor.classes = [] >> internal.leave.group.on.close = true >> isolation.level = read_uncommitted >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> max.partition.fetch.bytes = 1048576 >> max.poll.interval.ms = 300000 >> max.poll.records = 100 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> partition.assignment.strategy = >> [org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 65536 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 305000 >> retry.backoff.ms = 100 >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> send.buffer.bytes = 131072 >> session.timeout.ms = 10000 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = null >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> >> 2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration >> 'zookeeper.connect' was supplied but isn't a known config. >> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0 >> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId : >> fdcf75ea326b8e07 >> 2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client >> with props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096, >> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, >> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183} >> 2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig >> values: >> bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, >> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] >> client.id = >> connections.max.idle.ms = 300000 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> receive.buffer.bytes = 65536 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 120000 >> retries = 5 >> retry.backoff.ms = 100 >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> send.buffer.bytes = 131072 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = null >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> >> 2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration >> 'zookeeper.connect' was supplied but isn't a known config. >> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0 >> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId : >> fdcf75ea326b8e07 >> 2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO] >> Registered kafka:type=kafka.Log4jController MBean >> 2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created >> KafkaSystemAdmin for system kafka >> Exception in thread "main" org.apache.samza.SamzaException: Failed to run >> application >> at >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) >> at >> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) >> at >> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) >> Caused by: java.lang.NullPointerException >> at java.util.HashMap.merge(HashMap.java:1225) >> at >> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) >> at >> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >> at >> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) >> at >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) >> at >> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) >> at >> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) >> at >> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) >> at >> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) >> at >> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) >> at >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) >> ... 2 more >> >> >> Thanks. >> >> Qi Shu >> >>> 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com> >> 写道: >>> >>> Hi Qi, >>> >>> Can you share your application configuration? Especially the systems your >>> application consumes and produces to and its related configuration. >>> Also, would it be possible for to attach the entire log? >>> >>> Thanks, >>> Bharath >>> >>> On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote: >>> >>>> Hi, >>>> >>>> Below is the running environment: >>>> Hadoop version 3.1.0 >>>> java version “1.8.0_151" >>>> samza-api-1.1.0.jar >>>> samza-core_2.12-1.1.0.jar >>>> samza-kafka_2.12-1.1.0.jar >>>> samza-kv_2.12-1.1.0.jar >>>> samza-kv-inmemory_2.12-1.1.0.jar >>>> samza-kv-rocksdb_2.12-1.1.0.jar >>>> samza-log4j_2.12-1.1.0.jar >>>> samza-shell-1.1.0-dist.tgz >>>> samza-yarn_2.12-1.1.0.jar >>>> scala-compiler-2.12.1.jar >>>> scala-library-2.12.1.jar >>>> scala-logging_2.12-3.7.2.jar >>>> scala-parser-combinators_2.12-1.0.4.jar >>>> scala-reflect-2.12.4.jar >>>> scalate-core_2.12-1.8.0.jar >>>> scalate-util_2.12-1.8.0.jar >>>> scalatra_2.12-2.5.0.jar >>>> scalatra-common_2.12-2.5.0.jar >>>> scalatra-scalate_2.12-2.5.0.jar >>>> scala-xml_2.12-1.0.6.jar >>>> kafka_2.12-1.1.0.jar >>>> kafka-clients-1.1.0.jar >>>> >>>> Below is the exception when starting app in Yarn: >>>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version : >> 1.1.0 >>>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId : >>>> fdcf75ea326b8e07 >>>> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO] >>>> Registered kafka:type=kafka.Log4jController MBean >>>> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created >>>> KafkaSystemAdmin for system kafka >>>> Exception in thread "main" org.apache.samza.SamzaException: Failed to >> run >>>> application >>>> at >>>> >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) >>>> at >>>> >> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) >>>> at >>>> >> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) >>>> Caused by: java.lang.NullPointerException >>>> at java.util.HashMap.merge(HashMap.java:1225) >>>> at >>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) >>>> at >>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >>>> at >>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) >>>> at >>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>>> at >>>> >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>>> at >>>> >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>>> at >>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>> at >>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>>> at >>>> >> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) >>>> at >>>> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) >>>> at >>>> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) >>>> at >>>> >> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) >>>> at >>>> >> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) >>>> at >>>> >> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) >>>> at >>>> >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) >>>> ... 2 more >>>> >>>> >>>> Thanks for your help! >>>> >>>> Qi Shu >>>> >>>> >>>> >> >>