Hi all, I am trying to run a Kafka Connect cluster to ingest data from a relational database with jdbc connector.
I have been investigating many other solutions including Spark, Flink and Flume before using Kafka Connect, but none of them can be used to ingest relational databases in a clusterable way. With "cluster" I mean ingesting one database with several distributed processes in parallel, instead of each process in the cluster ingesting different databases. Kafka Connect is the option I am investigating currently. After reading the documentation, I have not found any clear statement about if my use case can be supported, so I have to make a test to figure it out. I created a cluster with the following docker container configuration: --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper hostname: zookeeper ports: - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker1: image: confluentinc/cp-kafka hostname: broker1 depends_on: - zookeeper ports: - '9092' environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092' broker2: image: confluentinc/cp-kafka hostname: broker2 depends_on: - zookeeper ports: - '9092' environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092' broker3: image: confluentinc/cp-kafka hostname: broker3 depends_on: - zookeeper ports: - '9092' environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092' schema_registry: image: confluentinc/cp-schema-registry hostname: schema_registry depends_on: - zookeeper - broker1 - broker2 - broker3 ports: - '8081' environment: SCHEMA_REGISTRY_HOST_NAME: schema_registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' connect1: image: confluentinc/cp-kafka-connect hostname: connect1 depends_on: - zookeeper - broker1 - broker2 - broker3 - schema_registry ports: - "8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' CONNECT_REST_ADVERTISED_HOST_NAME: connect1 CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081 ' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' http://schema_registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' connect2: image: confluentinc/cp-kafka-connect hostname: connect2 depends_on: - zookeeper - broker1 - broker2 - broker3 - schema_registry ports: - "8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' CONNECT_REST_ADVERTISED_HOST_NAME: connect2 CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081 ' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' http://schema_registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' connect3: image: confluentinc/cp-kafka-connect hostname: connect3 depends_on: - zookeeper - broker1 - broker2 - broker3 - schema_registry ports: - "8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' CONNECT_REST_ADVERTISED_HOST_NAME: connect3 CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081 ' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' http://schema_registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. JsonConverter CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' control-center: image: confluentinc/cp-enterprise-control-center depends_on: - zookeeper - broker1 - broker2 - broker3 - schema_registry - connect1 - connect2 - connect3 ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro ker3:9092' CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c onnect3:8083' CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 PORT: 9021 postgres: image: postgres hostname: postgres ports: - "5432" environment: POSTGRES_PASSWORD: postgres The Kafka cluster is running properly, but I don't know how to verify if the Kafka Connect cluster is running properly. I prepared some test data in the database, and created a source connector with the following configuration: { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "name": "test", "tasks.max": 3, "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "connection.url": "jdbc:postgresql://postgres:5432/postgres?user=postgres& password=postgres", "table.whitelist": "pgbench_accounts", "batch.max.rows": 1, "topic.prefix": "test", "mode": "incrementing", "incrementing.column.name": "aid" } The ingestion process is correct and I can consume the produced messages. But I still have no way to figure out if the ingestion is parallelized. I called the status API and received the following result: { "name":"test", "connector":{ "state":"RUNNING", "worker_id":"connect2:8083" }, "tasks":[ { "state":"RUNNING", "id":0, "worker_id":"connect3:8083" } ] } This result is the same for all instances. Does it mean the ingestion tasks are not parallelized? Is there anything important I am missing or this type of clustering is simply not supported? Any comments and suggestions are highly appreciated. Have a nice day! Best regards, Yang