Hi All,

I am creating kafka connector for mongodb as a source .My connector is
starting and connecting with kafka but it is not committing any offset.

This is output after starting connector.

[root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
config/connect-standalone.properties config/mongodb.properties
[2017-03-27 18:32:58,019] INFO StandaloneConfig values:
        rest.advertised.host.name = null
        task.shutdown.graceful.timeout.ms = 5000
        rest.host.name = null
        rest.advertised.port = null
        bootstrap.servers = [localhost:9092]
        offset.flush.timeout.ms = 5000
        offset.flush.interval.ms = 10000
        rest.port = 8083
        internal.key.converter = class
org.apache.kafka.connect.json.JsonConverter
        access.control.allow.methods =
        access.control.allow.origin =
        offset.storage.file.filename = /tmp/connect.offsets
        internal.value.converter = class
org.apache.kafka.connect.json.JsonConverter
        value.converter = class org.apache.kafka.connect.json.JsonConverter
        key.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
[2017-03-27 18:32:58,162] INFO Logging initialized @609ms
(org.eclipse.jetty.util.log:186)
[2017-03-27 18:32:58,392] INFO Kafka Connect starting
(org.apache.kafka.connect.runtime.Connect:52)
[2017-03-27 18:32:58,392] INFO Herder starting
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-03-27 18:32:58,393] INFO Worker starting
(org.apache.kafka.connect.runtime.Worker:113)
[2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
/tmp/connect.offsets
(org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-03-27 18:32:58,398] INFO Worker started
(org.apache.kafka.connect.runtime.Worker:118)
[2017-03-27 18:32:58,398] INFO Herder started
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-03-27 18:32:58,398] INFO Starting REST server
(org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
(org.eclipse.jetty.server.Server:327)
[2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
(org.hibernate.validator.internal.util.Version:27)
Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The
(sub)resource method listConnectors in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method createConnector in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
contains empty path annotation.
WARNING: The (sub)resource method serverInfo in
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty
path annotation.

[2017-03-27 18:33:00,015] INFO Started
o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-03-27 18:33:00,043] INFO Started @2492ms
(org.eclipse.jetty.server.Server:379)
[2017-03-27 18:33:00,043] INFO REST server listening at
http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-03-27 18:33:00,043] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:58)
[2017-03-27 18:33:00,048] INFO ConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:159)
[2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
0.10.0.1 of type class
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:162)
[2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
(org.apache.kafka.connect.runtime.Worker:173)
[2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
[2017-03-27 18:33:00,056] INFO Creating task mongodb-0
(org.apache.kafka.connect.runtime.Worker:252)
[2017-03-27 18:33:00,056] INFO ConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,057] INFO TaskConfig values:
        task.class = class
org.apache.kafka.connect.mongodb.MongodbSourceTask
 (org.apache.kafka.connect.runtime.TaskConfig:178)
[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)
[2017-03-27 18:33:00,066] INFO ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 9223372036854775807
        interceptor.classes = null
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 2147483647
        acks = all
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 2147483647
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 1
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0
 (org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,103] INFO ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 9223372036854775807
        interceptor.classes = null
        ssl.truststore.password = null
        client.id = producer-1
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 2147483647
        acks = all
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 2147483647
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 1
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0
 (org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
(org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
(org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-27 18:33:00,121] INFO Created connector mongodb
(org.apache.kafka.connect.cli.ConnectStandalone:93)
[2017-03-27 18:33:00,319] INFO Cluster created with settings
{hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:02,162] WARN could not create Dir using directory from
url
file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
        at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,471] WARN could not create Dir using directory from
url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
        at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
producing 6162 keys and 38674 values  (org.reflections.Reflections:229)



After that it is not doing anything.

This is my config file.

name=mongodb
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
host=localhost
port=27017
batch.size=100
schema.name=mongodbschema
topic.prefix=mongo-prefix
databases=sampledb.hero



Please do suggest me the error ASAP.

Thanks

Reply via email to