Hi all,

I am trying to run a Kafka Connect cluster to ingest data from a relational
database with jdbc connector.

I have been investigating many other solutions including Spark, Flink and
Flume before using Kafka Connect, but none of them can be used to ingest
relational databases in a clusterable way. With "cluster" I mean ingesting
one database with several distributed processes in parallel, instead of
each process in the cluster ingesting different databases. Kafka Connect is
the option I am investigating currently. After reading the documentation, I
have not found any clear statement about if my use case can be supported,
so I have to make a test to figure it out.

I created a cluster with the following docker container configuration:

---
version: '2'
services:
 zookeeper:
   image: confluentinc/cp-zookeeper
   hostname: zookeeper
   ports:
     - "2181"
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181
     ZOOKEEPER_TICK_TIME: 2000

  broker1:
   image: confluentinc/cp-kafka
   hostname: broker1
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'

  broker2:
   image: confluentinc/cp-kafka
   hostname: broker2
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 2
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'

  broker3:
   image: confluentinc/cp-kafka
   hostname: broker3
   depends_on:
     - zookeeper
   ports:
     - '9092'
   environment:
     KAFKA_BROKER_ID: 3
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'

  schema_registry:
   image: confluentinc/cp-schema-registry
   hostname: schema_registry
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
   ports:
     - '8081'
   environment:
     SCHEMA_REGISTRY_HOST_NAME: schema_registry
     SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect1:
   image: confluentinc/cp-kafka-connect
   hostname: connect1
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect1
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  connect2:
   image: confluentinc/cp-kafka-connect
   hostname: connect2
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect2
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  connect3:
   image: confluentinc/cp-kafka-connect
   hostname: connect3
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
   ports:
     - "8083"
   environment:
     CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
     CONNECT_REST_ADVERTISED_HOST_NAME: connect3
     CONNECT_REST_PORT: 8083
     CONNECT_GROUP_ID: compose-connect-group
     CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
     CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
     CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
     CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081
'
     CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
     CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
http://schema_registry:8081'
     CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
JsonConverter
     CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  control-center:
   image: confluentinc/cp-enterprise-control-center
   depends_on:
     - zookeeper
     - broker1
     - broker2
     - broker3
     - schema_registry
     - connect1
     - connect2
     - connect3
   ports:
     - "9021:9021"
   environment:
     CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
ker3:9092'
     CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
onnect3:8083'
     CONTROL_CENTER_REPLICATION_FACTOR: 1
     CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
     CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
     PORT: 9021

  postgres:
   image: postgres
   hostname: postgres
   ports:
     - "5432"
   environment:
     POSTGRES_PASSWORD: postgres

The Kafka cluster is running properly, but I don't know how to verify if
the Kafka Connect cluster is running properly. I prepared some test data in
the database, and created a source connector with the following
configuration:

{
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "name": "test",
 "tasks.max": 3,
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "connection.url": "jdbc:postgresql://postgres:5432/postgres?user=postgres&
password=postgres",
 "table.whitelist": "pgbench_accounts",
 "batch.max.rows": 1,
 "topic.prefix": "test",
 "mode": "incrementing",
 "incrementing.column.name": "aid"
}

The ingestion process is correct and I can consume the produced messages.
But I still have no way to figure out if the ingestion is parallelized. I
called the status API and received the following result:

{
   "name":"test",
   "connector":{
      "state":"RUNNING",
      "worker_id":"connect2:8083"
   },
   "tasks":[
      {
         "state":"RUNNING",
         "id":0,
         "worker_id":"connect3:8083"
      }
   ]
}

This result is the same for all instances. Does it mean the ingestion tasks
are not parallelized? Is there anything important I am missing or this type
of clustering is simply not supported?

Any comments and suggestions are highly appreciated. Have a nice day!

Best regards,
Yang

Reply via email to