Hi All, I am creating kafka connector for mongodb as a source .My connector is starting and connecting with kafka but it is not committing any offset.
This is output after starting connector. [root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh config/connect-standalone.properties config/mongodb.properties [2017-03-27 18:32:58,019] INFO StandaloneConfig values: rest.advertised.host.name = null task.shutdown.graceful.timeout.ms = 5000 rest.host.name = null rest.advertised.port = null bootstrap.servers = [localhost:9092] offset.flush.timeout.ms = 5000 offset.flush.interval.ms = 10000 rest.port = 8083 internal.key.converter = class org.apache.kafka.connect.json.JsonConverter access.control.allow.methods = access.control.allow.origin = offset.storage.file.filename = /tmp/connect.offsets internal.value.converter = class org.apache.kafka.connect.json.JsonConverter value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class org.apache.kafka.connect.json.JsonConverter (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178) [2017-03-27 18:32:58,162] INFO Logging initialized @609ms (org.eclipse.jetty.util.log:186) [2017-03-27 18:32:58,392] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:52) [2017-03-27 18:32:58,392] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70) [2017-03-27 18:32:58,393] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:113) [2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60) [2017-03-27 18:32:58,398] INFO Worker started (org.apache.kafka.connect.runtime.Worker:118) [2017-03-27 18:32:58,398] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72) [2017-03-27 18:32:58,398] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98) [2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327) [2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final (org.hibernate.validator.internal.util.Version:27) Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation. WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation. [2017-03-27 18:33:00,015] INFO Started o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744) [2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{ 0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266) [2017-03-27 18:33:00,043] INFO Started @2492ms (org.eclipse.jetty.server.Server:379) [2017-03-27 18:33:00,043] INFO REST server listening at http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150) [2017-03-27 18:33:00,043] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58) [2017-03-27 18:33:00,048] INFO ConnectorConfig values: connector.class = org.apache.kafka.connect.mongodb.MongodbSourceConnector tasks.max = 1 name = mongodb value.converter = null key.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:178) [2017-03-27 18:33:00,048] INFO Creating connector mongodb of type org.apache.kafka.connect.mongodb.MongodbSourceConnector (org.apache.kafka.connect.runtime.Worker:159) [2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version 0.10.0.1 of type class org.apache.kafka.connect.mongodb.MongodbSourceConnector (org.apache.kafka.connect.runtime.Worker:162) [2017-03-27 18:33:00,053] INFO Finished creating connector mongodb (org.apache.kafka.connect.runtime.Worker:173) [2017-03-27 18:33:00,053] INFO SourceConnectorConfig values: connector.class = org.apache.kafka.connect.mongodb.MongodbSourceConnector tasks.max = 1 name = mongodb value.converter = null key.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:178) [2017-03-27 18:33:00,056] INFO Creating task mongodb-0 (org.apache.kafka.connect.runtime.Worker:252) [2017-03-27 18:33:00,056] INFO ConnectorConfig values: connector.class = org.apache.kafka.connect.mongodb.MongodbSourceConnector tasks.max = 1 name = mongodb value.converter = null key.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:178) [2017-03-27 18:33:00,057] INFO TaskConfig values: task.class = class org.apache.kafka.connect.mongodb.MongodbSourceTask (org.apache.kafka.connect.runtime.TaskConfig:178) [2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version 0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask (org.apache.kafka.connect.runtime.Worker:264) [2017-03-27 18:33:00,066] INFO ProducerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [localhost:9092] ssl.keystore.type = JKS sasl.mechanism = GSSAPI max.block.ms = 9223372036854775807 interceptor.classes = null ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null request.timeout.ms = 2147483647 acks = all receive.buffer.bytes = 32768 ssl.truststore.type = JKS retries = 2147483647 ssl.truststore.location = null ssl.keystore.password = null send.buffer.bytes = 131072 compression.type = none metadata.fetch.timeout.ms = 60000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 max.in.flight.requests.per.connection = 1 metrics.num.samples = 2 ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] batch.size = 16384 ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner linger.ms = 0 (org.apache.kafka.clients.producer.ProducerConfig:178) [2017-03-27 18:33:00,103] INFO ProducerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [localhost:9092] ssl.keystore.type = JKS sasl.mechanism = GSSAPI max.block.ms = 9223372036854775807 interceptor.classes = null ssl.truststore.password = null client.id = producer-1 ssl.endpoint.identification.algorithm = null request.timeout.ms = 2147483647 acks = all receive.buffer.bytes = 32768 ssl.truststore.type = JKS retries = 2147483647 ssl.truststore.location = null ssl.keystore.password = null send.buffer.bytes = 131072 compression.type = none metadata.fetch.timeout.ms = 60000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 max.in.flight.requests.per.connection = 1 metrics.num.samples = 2 ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] batch.size = 16384 ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner linger.ms = 0 (org.apache.kafka.clients.producer.ProducerConfig:178) [2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser:83) [2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser:84) [2017-03-27 18:33:00,121] INFO Created connector mongodb (org.apache.kafka.connect.cli.ConnectStandalone:93) [2017-03-27 18:33:00,319] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71) [2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:138) [2017-03-27 18:33:00,442] INFO No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71) [2017-03-27 18:33:00,455] INFO Opened connection [connectionId{localValue:1, serverValue:4}] to localhost:27017 (org.mongodb.driver.connection:71) [2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169} (org.mongodb.driver.cluster:71) [2017-03-27 18:33:00,491] INFO Opened connection [connectionId{localValue:2, serverValue:5}] to localhost:27017 (org.mongodb.driver.connection:71) [2017-03-27 18:33:02,162] WARN could not create Dir using directory from url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib. skipping. (org.reflections.Reflections:104) java.lang.NullPointerException at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239) at org.reflections.vfs.Vfs.fromURL(Vfs.java:98) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.<init>(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275) at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384) at java.lang.Thread.run(Thread.java:745) [2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring the exception and continuing (org.reflections.Reflections:208) org.reflections.ReflectionsException: could not create Vfs.Dir from url, no matching UrlType was found [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib] either use fromURL(final URL url, final List<UrlType> urlTypes) or use the static setDefaultURLTypes(final List<UrlType> urlTypes) or addDefaultURLTypes(UrlType urlType) with your specialized UrlType. at org.reflections.vfs.Vfs.fromURL(Vfs.java:109) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.<init>(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275) at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384) at java.lang.Thread.run(Thread.java:745) [2017-03-27 18:33:02,471] WARN could not create Dir using directory from url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib. skipping. (org.reflections.Reflections:104) java.lang.NullPointerException at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239) at org.reflections.vfs.Vfs.fromURL(Vfs.java:98) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.<init>(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275) at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384) at java.lang.Thread.run(Thread.java:745) [2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring the exception and continuing (org.reflections.Reflections:208) org.reflections.ReflectionsException: could not create Vfs.Dir from url, no matching UrlType was found [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib] either use fromURL(final URL url, final List<UrlType> urlTypes) or use the static setDefaultURLTypes(final List<UrlType> urlTypes) or addDefaultURLTypes(UrlType urlType) with your specialized UrlType. at org.reflections.vfs.Vfs.fromURL(Vfs.java:109) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.<init>(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275) at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384) at java.lang.Thread.run(Thread.java:745) [2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls, producing 6162 keys and 38674 values (org.reflections.Reflections:229) After that it is not doing anything. This is my config file. name=mongodb connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector tasks.max=1 host=localhost port=27017 batch.size=100 schema.name=mongodbschema topic.prefix=mongo-prefix databases=sampledb.hero Please do suggest me the error ASAP. Thanks