The unit of parallelism in connect is a task. It's only listing one task, so you only have one process copying data. The connector can consume data from within a single *database* in parallel, but each *table* must be handled by a single task. Since your table whitelist only includes a single table, the connector will only generate a single task. If you add more tables to the whitelist then you'll see more tasks in the status API output.
-Ewen On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > Hi all, > > I am trying to run a Kafka Connect cluster to ingest data from a relational > database with jdbc connector. > > I have been investigating many other solutions including Spark, Flink and > Flume before using Kafka Connect, but none of them can be used to ingest > relational databases in a clusterable way. With "cluster" I mean ingesting > one database with several distributed processes in parallel, instead of > each process in the cluster ingesting different databases. Kafka Connect is > the option I am investigating currently. After reading the documentation, I > have not found any clear statement about if my use case can be supported, > so I have to make a test to figure it out. > > I created a cluster with the following docker container configuration: > > --- > version: '2' > services: > zookeeper: > image: confluentinc/cp-zookeeper > hostname: zookeeper > ports: > - "2181" > environment: > ZOOKEEPER_CLIENT_PORT: 2181 > ZOOKEEPER_TICK_TIME: 2000 > > broker1: > image: confluentinc/cp-kafka > hostname: broker1 > depends_on: > - zookeeper > ports: > - '9092' > environment: > KAFKA_BROKER_ID: 1 > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092' > > broker2: > image: confluentinc/cp-kafka > hostname: broker2 > depends_on: > - zookeeper > ports: > - '9092' > environment: > KAFKA_BROKER_ID: 2 > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092' > > broker3: > image: confluentinc/cp-kafka > hostname: broker3 > depends_on: > - zookeeper > ports: > - '9092' > environment: > KAFKA_BROKER_ID: 3 > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092' > > schema_registry: > image: confluentinc/cp-schema-registry > hostname: schema_registry > depends_on: > - zookeeper > - broker1 > - broker2 > - broker3 > ports: > - '8081' > environment: > SCHEMA_REGISTRY_HOST_NAME: schema_registry > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' > > connect1: > image: confluentinc/cp-kafka-connect > hostname: connect1 > depends_on: > - zookeeper > - broker1 > - broker2 > - broker3 > - schema_registry > ports: > - "8083" > environment: > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > CONNECT_REST_ADVERTISED_HOST_NAME: connect1 > CONNECT_REST_PORT: 8083 > CONNECT_GROUP_ID: compose-connect-group > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081 > ' > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081' > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > connect2: > image: confluentinc/cp-kafka-connect > hostname: connect2 > depends_on: > - zookeeper > - broker1 > - broker2 > - broker3 > - schema_registry > ports: > - "8083" > environment: > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > CONNECT_REST_ADVERTISED_HOST_NAME: connect2 > CONNECT_REST_PORT: 8083 > CONNECT_GROUP_ID: compose-connect-group > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081 > ' > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081' > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > connect3: > image: confluentinc/cp-kafka-connect > hostname: connect3 > depends_on: > - zookeeper > - broker1 > - broker2 > - broker3 > - schema_registry > ports: > - "8083" > environment: > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > CONNECT_REST_ADVERTISED_HOST_NAME: connect3 > CONNECT_REST_PORT: 8083 > CONNECT_GROUP_ID: compose-connect-group > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081 > ' > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > http://schema_registry:8081' > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > JsonConverter > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > control-center: > image: confluentinc/cp-enterprise-control-center > depends_on: > - zookeeper > - broker1 > - broker2 > - broker3 > - schema_registry > - connect1 > - connect2 > - connect3 > ports: > - "9021:9021" > environment: > CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro > ker3:9092' > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c > onnect3:8083' > CONTROL_CENTER_REPLICATION_FACTOR: 1 > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 > PORT: 9021 > > postgres: > image: postgres > hostname: postgres > ports: > - "5432" > environment: > POSTGRES_PASSWORD: postgres > > The Kafka cluster is running properly, but I don't know how to verify if > the Kafka Connect cluster is running properly. I prepared some test data in > the database, and created a source connector with the following > configuration: > > { > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", > "name": "test", > "tasks.max": 3, > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "connection.url": "jdbc:postgresql://postgres: > 5432/postgres?user=postgres& > password=postgres", > "table.whitelist": "pgbench_accounts", > "batch.max.rows": 1, > "topic.prefix": "test", > "mode": "incrementing", > "incrementing.column.name": "aid" > } > > The ingestion process is correct and I can consume the produced messages. > But I still have no way to figure out if the ingestion is parallelized. I > called the status API and received the following result: > > { > "name":"test", > "connector":{ > "state":"RUNNING", > "worker_id":"connect2:8083" > }, > "tasks":[ > { > "state":"RUNNING", > "id":0, > "worker_id":"connect3:8083" > } > ] > } > > This result is the same for all instances. Does it mean the ingestion tasks > are not parallelized? Is there anything important I am missing or this type > of clustering is simply not supported? > > Any comments and suggestions are highly appreciated. Have a nice day! > > Best regards, > Yang >