Christopher S Lester created KAFKA-7874: -------------------------------------------
Summary: NPE thrown while instantiating a KafkaStreams. object Key: KAFKA-7874 URL: https://issues.apache.org/jira/browse/KAFKA-7874 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.0 Environment: Windows 10 Pro bld 1803 Ryzen 7 2700 32GM ram Clojure 1.10 Reporter: Christopher S Lester I'm seeing an exception thrown when instantiating a KafkaStreams object in a Clojure stream processor and am not sure that this is a config problem (would be great if it is and someone can point out what config is missing). Kafka deps: {code:java} [org.apache.kafka/kafka-streams "2.1.0"] ;; https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar [org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;; https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils [org.apache.kafka/kafka-clients "2.1.0"] ;; https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients [org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code} Configuration: {code:java} ;; kafka configuration :calais-response-processor.config/kafka-configuration { :applicationid "open-calais-tagging-microservice" :bootstrap-servers "localhost:9092" :commit-interval 10000 ;; milliseconds :input-topic "db.draft.created" :output-topic "streaming.calais.response" } {code} Fn: {code:java} (defn start->streams [] (log/info "[start->streams] enter") (when (not (instance? KafkaStreams @streams)) (let [kafka-config (get-in (config CONFIG_PATH) [:calais-response-processor.config/kafka-configuration]) stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in kafka-config [:applicationid]) StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config [:auto.commit.interval.ms]) StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config [:bootstrap-servers]) StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String))) StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String))) StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}] (try (let [kafka-streams (KafkaStreams. (calais-processor-topology) (StreamsConfig. stream-processing-props))] (log/infof "[start->streams] created kafka stream with config: %s" stream-processing-props) (swap! streams kafka-streams)) (catch Exception e (log/error e))))) (.start @streams)) {code} Things seem to be "ok" if I don't catch the exception (i.e. it attempts to connect and process) but since it's throwing an exception I'm unable to control it's lifecycle right now unless I move this to a (def ..) vs a (defn..) .. however the exception still happens so I need that understood before putting it into production. Result: {code:java} => (start-stream-processing) "[start-stream-processing] START: streams processing" INFO calais-response-processor.core: [start->streams] enter INFO calais-response-processor.config: loading config from resources/config.edn INFO calais-response-processor.core: [calais-processor-topology] Open Calais API Streaming Topology INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values: application.id = open-calais-tagging-microservice application.server = bootstrap.servers = [localhost:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 client.id = commit.interval.ms = null connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde max.task.idle.ms = 0 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = exactly_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams topology.optimization = none upgrade.from = null windowstore.changelog.additional.retention.ms = 86400000 INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = default client.id = open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120000 retries = 5 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : eec43959745f444f INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1] Creating restore consumer client INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = default client.id = open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = false isolation.level = read_committed key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : eec43959745f444f INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1] Creating consumer client INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = default client.id = open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = open-calais-tagging-microservice heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = false isolation.level = read_committed key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration 'admin.retries' was supplied but isn't a known config. INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : eec43959745f444f ERROR calais-response-processor.core: #error { :cause nil :via [{:type java.lang.NullPointerException :message nil :at [org.apache.kafka.streams.processor.internals.StreamThread <init> StreamThread.java 719]}] :trace [[org.apache.kafka.streams.processor.internals.StreamThread <init> StreamThread.java 719] [org.apache.kafka.streams.processor.internals.StreamThread create StreamThread.java 671] [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706] [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624] [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608] [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598] [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2] [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1] [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1] [java.lang.reflect.Constructor newInstance nil -1] [clojure.lang.Reflector invokeConstructor Reflector.java 180] [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj 148] [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147] [calais_response_processor.core$start__GT_streams invoke core.clj 136] [calais_response_processor.core$start_stream_processing invokeStatic core.clj 165] [calais_response_processor.core$start_stream_processing invoke core.clj 162] [calais_response_processor.core$eval1539 invokeStatic form-init8971608817606635487.clj 1] [calais_response_processor.core$eval1539 invoke form-init8971608817606635487.clj 1] [clojure.lang.Compiler eval Compiler.java 7062] [clojure.lang.Compiler eval Compiler.java 7025] [clojure.core$eval invokeStatic core.clj 3206] [clojure.core$eval invoke core.clj 3202] [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243] [clojure.main$repl$read_eval_print__8572 invoke main.clj 243] [clojure.main$repl$fn__8581 invoke main.clj 261] [clojure.main$repl invokeStatic main.clj 261] [clojure.main$repl doInvoke main.clj 177] [clojure.lang.RestFn invoke RestFn.java 1523] [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke interruptible_eval.clj 83] [clojure.lang.AFn applyToHelper AFn.java 152] [clojure.lang.AFn applyTo AFn.java 144] [clojure.core$apply invokeStatic core.clj 657] [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965] [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965] [clojure.lang.RestFn invoke RestFn.java 425] [nrepl.middleware.interruptible_eval$evaluate invokeStatic interruptible_eval.clj 81] [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 50] [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 invoke interruptible_eval.clj 221] [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke interruptible_eval.clj 189] [clojure.lang.AFn run AFn.java 22] [java.util.concurrent.ThreadPoolExecutor runWorker nil -1] [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1] [java.lang.Thread run nil -1]]} NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember (Reflector.java:301) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)