[ https://issues.apache.org/jira/browse/KAFKA-7874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Christopher S Lester resolved KAFKA-7874. ----------------------------------------- Resolution: Not A Bug auto commit interval was nil, resolving as not a bug. > NPE thrown while instantiating a KafkaStreams. object > ----------------------------------------------------- > > Key: KAFKA-7874 > URL: https://issues.apache.org/jira/browse/KAFKA-7874 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0 > Environment: Windows 10 Pro bld 1803 > Ryzen 7 2700 32GM ram > Clojure 1.10 > Reporter: Christopher S Lester > Priority: Major > > I'm seeing an exception thrown when instantiating a KafkaStreams object in a > Clojure stream processor and am not sure that this is a config problem (would > be great if it is and someone can point out what config is missing). > Kafka deps: > {code:java} > [org.apache.kafka/kafka-streams "2.1.0"] ;; > https://search.maven.org/artifact/org.apache.kafka/kafka-streams/2.1.0/jar > [org.apache.kafka/kafka-streams-test-utils "2.1.0"] ;; > https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils > [org.apache.kafka/kafka-clients "2.1.0"] ;; > https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients > [org.apache.kafka/kafka-clients "1.1.0" :classifier "test"]{code} > > Configuration: > {code:java} > ;; kafka configuration > :calais-response-processor.config/kafka-configuration { > :applicationid "open-calais-tagging-microservice" > :bootstrap-servers "localhost:9092" > :commit-interval 10000 ;; milliseconds > :input-topic "db.draft.created" > :output-topic "streaming.calais.response" > } > {code} > > Fn: > {code:java} > (defn start->streams > [] > (log/info "[start->streams] enter") > (when (not (instance? KafkaStreams @streams)) > (let [kafka-config (get-in (config CONFIG_PATH) > [:calais-response-processor.config/kafka-configuration]) > stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (get-in > kafka-config [:applicationid]) > StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (get-in kafka-config > [:auto.commit.interval.ms]) > StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (get-in kafka-config > [:bootstrap-servers]) > StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass > (Serdes/String))) > StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass > (Serdes/String))) > StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}] > (try > (let [kafka-streams (KafkaStreams. (calais-processor-topology) > (StreamsConfig. stream-processing-props))] > (log/infof "[start->streams] created kafka stream with config: %s" > stream-processing-props) > (swap! streams kafka-streams)) > (catch Exception e (log/error e))))) > (.start @streams)) > {code} > Things seem to be "ok" if I don't catch the exception (i.e. it attempts to > connect and process) but since it's throwing an exception I'm unable to > control it's lifecycle right now unless I move this to a (def ..) vs a > (defn..) .. however the exception still happens so I need that understood > before putting it into production. > > Result: > {code:java} > [{:type java.lang.NullPointerException > :message nil > :at [org.apache.kafka.streams.processor.internals.StreamThread <init> > StreamThread.java 719]}] > :trace > [[org.apache.kafka.streams.processor.internals.StreamThread <init> > StreamThread.java 719] > [org.apache.kafka.streams.processor.internals.StreamThread create > StreamThread.java 671]{code} > > (full log of startup) > {code:java} > => (start-stream-processing) > "[start-stream-processing] START: streams processing" > INFO calais-response-processor.core: [start->streams] enter > INFO calais-response-processor.config: loading config from > resources/config.edn > INFO calais-response-processor.core: [calais-processor-topology] Open Calais > API Streaming Topology > INFO org.apache.kafka.streams.StreamsConfig: StreamsConfig values: > application.id = open-calais-tagging-microservice > application.server = > bootstrap.servers = [localhost:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 10485760 > client.id = > commit.interval.ms = null > connections.max.idle.ms = 540000 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class > org.apache.kafka.streams.processor.FailOnInvalidTimestamp > default.value.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > max.task.idle.ms = 0 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > num.standby.replicas = 0 > num.stream.threads = 1 > partition.grouper = class > org.apache.kafka.streams.processor.DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = exactly_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 1 > request.timeout.ms = 40000 > retries = 0 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /tmp/kafka-streams > topology.optimization = none > upgrade.from = null > windowstore.changelog.additional.retention.ms = 86400000 > INFO org.apache.kafka.clients.admin.AdminClientConfig: AdminClientConfig > values: > bootstrap.servers = [localhost:9092] > client.dns.lookup = default > client.id = > open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-admin > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : > eec43959745f444f > INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread > [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1] > Creating restore consumer client > INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [localhost:9092] > check.crcs = true > client.dns.lookup = default > client.id = > open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-restore-consumer > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = false > isolation.level = read_committed > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : > eec43959745f444f > INFO org.apache.kafka.streams.processor.internals.StreamThread: stream-thread > [open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1] > Creating consumer client > INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [localhost:9092] > check.crcs = true > client.dns.lookup = default > client.id = > open-calais-tagging-microservice-95aca1d2-833c-4336-93d2-9c17dbc73e86-StreamThread-1-consumer > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = open-calais-tagging-microservice > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = false > isolation.level = read_committed > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > WARN org.apache.kafka.clients.consumer.ConsumerConfig: The configuration > 'admin.retries' was supplied but isn't a known config. > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.1.0 > INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : > eec43959745f444f > ERROR calais-response-processor.core: #error { > :cause nil > :via > [{:type java.lang.NullPointerException > :message nil > :at [org.apache.kafka.streams.processor.internals.StreamThread <init> > StreamThread.java 719]}] > :trace > [[org.apache.kafka.streams.processor.internals.StreamThread <init> > StreamThread.java 719] > [org.apache.kafka.streams.processor.internals.StreamThread create > StreamThread.java 671] > [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 706] > [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 624] > [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 608] > [org.apache.kafka.streams.KafkaStreams <init> KafkaStreams.java 598] > [sun.reflect.NativeConstructorAccessorImpl newInstance0 nil -2] > [sun.reflect.NativeConstructorAccessorImpl newInstance nil -1] > [sun.reflect.DelegatingConstructorAccessorImpl newInstance nil -1] > [java.lang.reflect.Constructor newInstance nil -1] > [clojure.lang.Reflector invokeConstructor Reflector.java 180] > [calais_response_processor.core$start__GT_streams$fn__258 invoke core.clj > 148] > [calais_response_processor.core$start__GT_streams invokeStatic core.clj 147] > [calais_response_processor.core$start__GT_streams invoke core.clj 136] > [calais_response_processor.core$start_stream_processing invokeStatic > core.clj 165] > [calais_response_processor.core$start_stream_processing invoke core.clj 162] > [calais_response_processor.core$eval1539 invokeStatic > form-init8971608817606635487.clj 1] > [calais_response_processor.core$eval1539 invoke > form-init8971608817606635487.clj 1] > [clojure.lang.Compiler eval Compiler.java 7062] > [clojure.lang.Compiler eval Compiler.java 7025] > [clojure.core$eval invokeStatic core.clj 3206] > [clojure.core$eval invoke core.clj 3202] > [clojure.main$repl$read_eval_print__8572$fn__8575 invoke main.clj 243] > [clojure.main$repl$read_eval_print__8572 invoke main.clj 243] > [clojure.main$repl$fn__8581 invoke main.clj 261] > [clojure.main$repl invokeStatic main.clj 261] > [clojure.main$repl doInvoke main.clj 177] > [clojure.lang.RestFn invoke RestFn.java 1523] > [nrepl.middleware.interruptible_eval$evaluate$fn__912 invoke > interruptible_eval.clj 83] > [clojure.lang.AFn applyToHelper AFn.java 152] > [clojure.lang.AFn applyTo AFn.java 144] > [clojure.core$apply invokeStatic core.clj 657] > [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965] > [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965] > [clojure.lang.RestFn invoke RestFn.java 425] > [nrepl.middleware.interruptible_eval$evaluate invokeStatic > interruptible_eval.clj 81] > [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj > 50] > [nrepl.middleware.interruptible_eval$interruptible_eval$fn__955$fn__958 > invoke interruptible_eval.clj 221] > [nrepl.middleware.interruptible_eval$run_next$fn__950 invoke > interruptible_eval.clj 189] > [clojure.lang.AFn run AFn.java 22] > [java.util.concurrent.ThreadPoolExecutor runWorker nil -1] > [java.util.concurrent.ThreadPoolExecutor$Worker run nil -1] > [java.lang.Thread run nil -1]]} > NullPointerException clojure.lang.Reflector.invokeNoArgInstanceMember > (Reflector.java:301) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)