Hi Bharath,

        Your theory is correct, InfluxDBFactory just return null for 
SystemAdmin. Now it’s fixed already! But another problem occurred, below is 
stack trace:

2019-06-03 17:26:11.176 [main] KafkaSystemAdmin [INFO] Fetching 
SystemStreamMetadata for topics [__samza_coordinator_canal-metrics-test_1] on 
system kafka
2019-06-03 17:26:11.179 [main] KafkaSystemAdmin [ERROR] Fetching system stream 
metadata for: [__samza_coordinator_canal-metrics-test_1] threw an exception.
java.lang.NullPointerException
        at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
        at 
org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
        at kafka.utils.Logging.logger(Logging.scala:43)
        at kafka.utils.Logging.debug(Logging.scala:62)
        at kafka.utils.Logging.debug$(Logging.scala:62)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292)
        at 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109)
        at org.apache.samza.job.JobRunner.run(JobRunner.scala:105)
        at 
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
        at java.util.ArrayList.forEach(ArrayList.java:1255)
        at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
        at 
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
        at 
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
Exception in thread "main" org.apache.samza.SamzaException: Failed to run 
application
        at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
        at 
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
        at 
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
Caused by: org.apache.samza.SamzaException: java.lang.NullPointerException
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:358)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$5.apply(KafkaSystemAdmin.java:347)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:97)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:374)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.java:292)
        at 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.register(CoordinatorStreamSystemConsumer.java:109)
        at org.apache.samza.job.JobRunner.run(JobRunner.scala:105)
        at 
org.apache.samza.runtime.RemoteApplicationRunner.lambda$run$0(RemoteApplicationRunner.java:76)
        at java.util.ArrayList.forEach(ArrayList.java:1255)
        at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:73)
        ... 2 more
Caused by: java.lang.NullPointerException
        at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
        at 
org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:75)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:253)
        at kafka.utils.Logging.logger(Logging.scala:43)
        at kafka.utils.Logging.debug(Logging.scala:62)
        at kafka.utils.Logging.debug$(Logging.scala:62)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.debug(KafkaSystemAdminUtilsScala.scala:45)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala$.assembleMetadata(KafkaSystemAdminUtilsScala.scala:74)
        at 
org.apache.samza.system.kafka.KafkaSystemAdminUtilsScala.assembleMetadata(KafkaSystemAdminUtilsScala.scala)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.fetchSystemStreamMetadata(KafkaSystemAdmin.java:461)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin.access$400(KafkaSystemAdmin.java:76)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:340)
        at 
org.apache.samza.system.kafka.KafkaSystemAdmin$4.apply(KafkaSystemAdmin.java:337)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:90)
        ... 9 more



        log4j.xml can be found in lib and its content:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>

    <appender name="RollingAppender" 
class="org.apache.log4j.RollingFileAppender">
        <param name="File" 
value="${samza.log.dir}/${samza.container.name}.log"/>
        <param name="MaxFileSize" value="10MB"/>
        <param name="MaxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} 
[%t] %c{1} [%p] %m%n"/>
        </layout>
    </appender>

    <appender name="StartupAppender" 
class="org.apache.log4j.RollingFileAppender">
        <param name="File" 
value="${samza.log.dir}/${samza.container.name}-startup.log"/>
        <param name="MaxFileSize" value="10MB"/>
        <param name="MaxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} 
[%t] %c{1} [%p] %m%n"/>
        </layout>
    </appender>

    <logger name="STARTUP_LOGGER" additivity="false">
        <level value="info"/>
        <appender-ref ref="StartupAppender"/>
    </logger>

    <root>
        <priority value="info"/>
        <appender-ref ref="RollingAppender"/>
    </root>

</log4j:configuration>


Thanks for you help!
        

Qi Shu


> 在 2019年5月31日,上午4:59,Bharath Kumara Subramanian <codin.mart...@gmail.com> 写道:
> 
> Here is my theory based on the stack trace and logs.
> InfluxDBSystemFactory could not create the admin successfully and returned
> a null. Inside JavaSystemConfig, when it tries to get all the admins for
> the systems available, Collectors.toMap(..) throws an exception during
> reducing due to a null value.
> 
> Few questions,
> 1. Can getAdmin(...) return a null?
> 2. Can you add some logs to your getAdmin API to see if they are printed
> right before the exception?
> 
> I am guessing here since I don't have access to the source code of
> *InfluxDBSystemFactory*. If you can share the source code of
> InfluxDBSystemFactory, it will help me confirm my theory.
> 
> Let me know how it goes.
> 
> Thanks,
> Bharath
> 
> On Wed, May 29, 2019 at 7:01 PM QiShu <sh...@eefung.com> wrote:
> 
>> Hi Bharath,
>> 
>>        Below is configuration:
>> # App
>> app.name=canal-metrics
>> app.id=test
>> app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
>> task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
>> 
>> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> job.default.system=kafka
>> 
>> job.container.thread.pool.size=0
>> task.max.concurrency=1
>> 
>> task.opts=-Xmx1843m
>> 
>> task.checkpoint.replication.factor=3
>> 
>> 
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> 
>> # Checkpointing
>> 
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> 
>> # Kafka
>> systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183
>> systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,
>> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096
>> 
>> # Systems & Streams
>> 
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> systems.kafka.default.stream.samza.key.serde=string
>> systems.kafka.default.stream.samza.msg.serde=string
>> 
>> 
>> systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory
>> systems.hstore.default.stream.samza.key.serde=string
>> systems.hstore.default.stream.samza.msg.serde=string
>> 
>> task.inputs=kafka.samza-metrics
>> 
>> task.consumer.batch.size=100
>> 
>> # Deployment
>> yarn.package.path=hdfs://
>> p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz
>> yarn.container.count=1
>> cluster-manager.container.memory.mb=2048
>> 
>> 
>>        Below is entire log:
>> added manifest
>> java version "1.8.0_151"
>> Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
>> Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
>> /usr/java/jdk1.8.0_151/bin/java
>> -Dlog4j.configuration=file:bin/log4j-console.xml
>> -DisThreadContextMapInheritable=true
>> -Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal
>> -Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp
>> -Xmx768M -XX:+PrintGCDateStamps
>> -Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log
>> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
>> -XX:GCLogFileSize=10241024 -d64 -cp /etc/hadoop/conf:pathing.jar
>> org.apache.samza.runtime.ApplicationRunnerMain
>> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
>> --config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties
>> 2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name:
>> com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
>> 2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this
>> run is 1559181550904-ec4b9a69
>> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined,
>> generating job.name equal to app.name value: canal-metrics
>> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined,
>> generating job.id equal to app.name value: test
>> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined,
>> generating job.name equal to app.name value: canal-metrics
>> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined,
>> generating job.id equal to app.name value: test
>> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset
>> reset value for KafkaConsumer for system upcoming converted from
>> latest(samza) to {}
>> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting
>> auto.offset.reset for system kafka to latest
>> 2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key
>> serialization for the consumer(for system kafka) to ByteArrayDeserializer
>> 2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value
>> serialization for the consumer(for system kafka) to ByteArrayDeserializer
>> 2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating
>> KafkaConsumer for systemName kafka with properties
>> {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
>> value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
>> enable.auto.commit=false, max.poll.records=100,
>> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, 
>> group.id=canal-metrics-test,
>> partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor,
>> bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,
>> 10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
>> auto.offset.reset=latest, client.id
>> =kafka_admin_consumer-canal_metrics-test}
>> 2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values:
>>        auto.commit.interval.ms = 5000
>>        auto.offset.reset = latest
>>        bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096,
>> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
>>        check.crcs = true
>>        client.id = kafka_admin_consumer-canal_metrics-test
>>        connections.max.idle.ms = 540000
>>        enable.auto.commit = false
>>        exclude.internal.topics = true
>>        fetch.max.bytes = 52428800
>>        fetch.max.wait.ms = 500
>>        fetch.min.bytes = 1
>>        group.id = canal-metrics-test
>>        heartbeat.interval.ms = 3000
>>        interceptor.classes = []
>>        internal.leave.group.on.close = true
>>        isolation.level = read_uncommitted
>>        key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>        max.partition.fetch.bytes = 1048576
>>        max.poll.interval.ms = 300000
>>        max.poll.records = 100
>>        metadata.max.age.ms = 300000
>>        metric.reporters = []
>>        metrics.num.samples = 2
>>        metrics.recording.level = INFO
>>        metrics.sample.window.ms = 30000
>>        partition.assignment.strategy =
>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>        receive.buffer.bytes = 65536
>>        reconnect.backoff.max.ms = 1000
>>        reconnect.backoff.ms = 50
>>        request.timeout.ms = 305000
>>        retry.backoff.ms = 100
>>        sasl.jaas.config = null
>>        sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>        sasl.kerberos.min.time.before.relogin = 60000
>>        sasl.kerberos.service.name = null
>>        sasl.kerberos.ticket.renew.jitter = 0.05
>>        sasl.kerberos.ticket.renew.window.factor = 0.8
>>        sasl.mechanism = GSSAPI
>>        security.protocol = PLAINTEXT
>>        send.buffer.bytes = 131072
>>        session.timeout.ms = 10000
>>        ssl.cipher.suites = null
>>        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>        ssl.endpoint.identification.algorithm = null
>>        ssl.key.password = null
>>        ssl.keymanager.algorithm = SunX509
>>        ssl.keystore.location = null
>>        ssl.keystore.password = null
>>        ssl.keystore.type = JKS
>>        ssl.protocol = TLS
>>        ssl.provider = null
>>        ssl.secure.random.implementation = null
>>        ssl.trustmanager.algorithm = PKIX
>>        ssl.truststore.location = null
>>        ssl.truststore.password = null
>>        ssl.truststore.type = JKS
>>        value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> 
>> 2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration
>> 'zookeeper.connect' was supplied but isn't a known config.
>> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0
>> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId :
>> fdcf75ea326b8e07
>> 2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client
>> with props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,
>> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
>> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183}
>> 2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig
>> values:
>>        bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096,
>> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
>>        client.id =
>>        connections.max.idle.ms = 300000
>>        metadata.max.age.ms = 300000
>>        metric.reporters = []
>>        metrics.num.samples = 2
>>        metrics.recording.level = INFO
>>        metrics.sample.window.ms = 30000
>>        receive.buffer.bytes = 65536
>>        reconnect.backoff.max.ms = 1000
>>        reconnect.backoff.ms = 50
>>        request.timeout.ms = 120000
>>        retries = 5
>>        retry.backoff.ms = 100
>>        sasl.jaas.config = null
>>        sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>        sasl.kerberos.min.time.before.relogin = 60000
>>        sasl.kerberos.service.name = null
>>        sasl.kerberos.ticket.renew.jitter = 0.05
>>        sasl.kerberos.ticket.renew.window.factor = 0.8
>>        sasl.mechanism = GSSAPI
>>        security.protocol = PLAINTEXT
>>        send.buffer.bytes = 131072
>>        ssl.cipher.suites = null
>>        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>        ssl.endpoint.identification.algorithm = null
>>        ssl.key.password = null
>>        ssl.keymanager.algorithm = SunX509
>>        ssl.keystore.location = null
>>        ssl.keystore.password = null
>>        ssl.keystore.type = JKS
>>        ssl.protocol = TLS
>>        ssl.provider = null
>>        ssl.secure.random.implementation = null
>>        ssl.trustmanager.algorithm = PKIX
>>        ssl.truststore.location = null
>>        ssl.truststore.password = null
>>        ssl.truststore.type = JKS
>> 
>> 2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration
>> 'zookeeper.connect' was supplied but isn't a known config.
>> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0
>> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId :
>> fdcf75ea326b8e07
>> 2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO]
>> Registered kafka:type=kafka.Log4jController MBean
>> 2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created
>> KafkaSystemAdmin for system kafka
>> Exception in thread "main" org.apache.samza.SamzaException: Failed to run
>> application
>>        at
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
>>        at
>> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
>>        at
>> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
>> Caused by: java.lang.NullPointerException
>>        at java.util.HashMap.merge(HashMap.java:1225)
>>        at
>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>>        at
>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>        at
>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
>>        at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>        at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>        at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>        at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>        at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>        at
>> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
>>        at
>> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
>>        at
>> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
>>        at
>> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
>>        at
>> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
>>        at
>> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
>>        at
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
>>        ... 2 more
>> 
>> 
>>        Thanks.
>> 
>> Qi Shu
>> 
>>> 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com>
>> 写道:
>>> 
>>> Hi Qi,
>>> 
>>> Can you share your application configuration? Especially the systems your
>>> application consumes and produces to and its related configuration.
>>> Also, would it be possible for to attach the entire log?
>>> 
>>> Thanks,
>>> Bharath
>>> 
>>> On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>>       Below is the running environment:
>>>> Hadoop version 3.1.0
>>>> java version “1.8.0_151"
>>>> samza-api-1.1.0.jar
>>>> samza-core_2.12-1.1.0.jar
>>>> samza-kafka_2.12-1.1.0.jar
>>>> samza-kv_2.12-1.1.0.jar
>>>> samza-kv-inmemory_2.12-1.1.0.jar
>>>> samza-kv-rocksdb_2.12-1.1.0.jar
>>>> samza-log4j_2.12-1.1.0.jar
>>>> samza-shell-1.1.0-dist.tgz
>>>> samza-yarn_2.12-1.1.0.jar
>>>> scala-compiler-2.12.1.jar
>>>> scala-library-2.12.1.jar
>>>> scala-logging_2.12-3.7.2.jar
>>>> scala-parser-combinators_2.12-1.0.4.jar
>>>> scala-reflect-2.12.4.jar
>>>> scalate-core_2.12-1.8.0.jar
>>>> scalate-util_2.12-1.8.0.jar
>>>> scalatra_2.12-2.5.0.jar
>>>> scalatra-common_2.12-2.5.0.jar
>>>> scalatra-scalate_2.12-2.5.0.jar
>>>> scala-xml_2.12-1.0.6.jar
>>>> kafka_2.12-1.1.0.jar
>>>> kafka-clients-1.1.0.jar
>>>> 
>>>>       Below is the exception when starting app in Yarn:
>>>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version :
>> 1.1.0
>>>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId :
>>>> fdcf75ea326b8e07
>>>> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO]
>>>> Registered kafka:type=kafka.Log4jController MBean
>>>> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created
>>>> KafkaSystemAdmin for system kafka
>>>> Exception in thread "main" org.apache.samza.SamzaException: Failed to
>> run
>>>> application
>>>>       at
>>>> 
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
>>>>       at
>>>> 
>> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
>>>>       at
>>>> 
>> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
>>>> Caused by: java.lang.NullPointerException
>>>>       at java.util.HashMap.merge(HashMap.java:1225)
>>>>       at
>>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>>>>       at
>>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>>>       at
>>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
>>>>       at
>>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>>       at
>>>> 
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>>       at
>>>> 
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>>       at
>>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>       at
>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>>       at
>>>> 
>> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
>>>>       at
>>>> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
>>>>       at
>>>> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
>>>>       at
>>>> 
>> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
>>>>       at
>>>> 
>> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
>>>>       at
>>>> 
>> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
>>>>       at
>>>> 
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
>>>>       ... 2 more
>>>> 
>>>> 
>>>>       Thanks for your help!
>>>> 
>>>> Qi Shu
>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to