Hi Ewen, Thanks a lot for your reply. So it means we cannot parallelize ingestion of one table with multiple processes. Is it because of Kafka Connect or the JDBC connector?
Have a nice day. Best regards, Yang 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>: > The unit of parallelism in connect is a task. It's only listing one task, > so you only have one process copying data. The connector can consume data > from within a single *database* in parallel, but each *table* must be > handled by a single task. Since your table whitelist only includes a single > table, the connector will only generate a single task. If you add more > tables to the whitelist then you'll see more tasks in the status API > output. > > -Ewen > > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > > > Hi all, > > > > I am trying to run a Kafka Connect cluster to ingest data from a > relational > > database with jdbc connector. > > > > I have been investigating many other solutions including Spark, Flink and > > Flume before using Kafka Connect, but none of them can be used to ingest > > relational databases in a clusterable way. With "cluster" I mean > ingesting > > one database with several distributed processes in parallel, instead of > > each process in the cluster ingesting different databases. Kafka Connect > is > > the option I am investigating currently. After reading the > documentation, I > > have not found any clear statement about if my use case can be supported, > > so I have to make a test to figure it out. > > > > I created a cluster with the following docker container configuration: > > > > --- > > version: '2' > > services: > > zookeeper: > > image: confluentinc/cp-zookeeper > > hostname: zookeeper > > ports: > > - "2181" > > environment: > > ZOOKEEPER_CLIENT_PORT: 2181 > > ZOOKEEPER_TICK_TIME: 2000 > > > > broker1: > > image: confluentinc/cp-kafka > > hostname: broker1 > > depends_on: > > - zookeeper > > ports: > > - '9092' > > environment: > > KAFKA_BROKER_ID: 1 > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092' > > > > broker2: > > image: confluentinc/cp-kafka > > hostname: broker2 > > depends_on: > > - zookeeper > > ports: > > - '9092' > > environment: > > KAFKA_BROKER_ID: 2 > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092' > > > > broker3: > > image: confluentinc/cp-kafka > > hostname: broker3 > > depends_on: > > - zookeeper > > ports: > > - '9092' > > environment: > > KAFKA_BROKER_ID: 3 > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092' > > > > schema_registry: > > image: confluentinc/cp-schema-registry > > hostname: schema_registry > > depends_on: > > - zookeeper > > - broker1 > > - broker2 > > - broker3 > > ports: > > - '8081' > > environment: > > SCHEMA_REGISTRY_HOST_NAME: schema_registry > > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' > > > > connect1: > > image: confluentinc/cp-kafka-connect > > hostname: connect1 > > depends_on: > > - zookeeper > > - broker1 > > - broker2 > > - broker3 > > - schema_registry > > ports: > > - "8083" > > environment: > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > > CONNECT_REST_ADVERTISED_HOST_NAME: connect1 > > CONNECT_REST_PORT: 8083 > > CONNECT_GROUP_ID: compose-connect-group > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081 > > ' > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081' > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > connect2: > > image: confluentinc/cp-kafka-connect > > hostname: connect2 > > depends_on: > > - zookeeper > > - broker1 > > - broker2 > > - broker3 > > - schema_registry > > ports: > > - "8083" > > environment: > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > > CONNECT_REST_ADVERTISED_HOST_NAME: connect2 > > CONNECT_REST_PORT: 8083 > > CONNECT_GROUP_ID: compose-connect-group > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081 > > ' > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081' > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > connect3: > > image: confluentinc/cp-kafka-connect > > hostname: connect3 > > depends_on: > > - zookeeper > > - broker1 > > - broker2 > > - broker3 > > - schema_registry > > ports: > > - "8083" > > environment: > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092' > > CONNECT_REST_ADVERTISED_HOST_NAME: connect3 > > CONNECT_REST_PORT: 8083 > > CONNECT_GROUP_ID: compose-connect-group > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081 > > ' > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > http://schema_registry:8081' > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json. > > JsonConverter > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > control-center: > > image: confluentinc/cp-enterprise-control-center > > depends_on: > > - zookeeper > > - broker1 > > - broker2 > > - broker3 > > - schema_registry > > - connect1 > > - connect2 > > - connect3 > > ports: > > - "9021:9021" > > environment: > > CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro > > ker3:9092' > > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c > > onnect3:8083' > > CONTROL_CENTER_REPLICATION_FACTOR: 1 > > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 > > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 > > PORT: 9021 > > > > postgres: > > image: postgres > > hostname: postgres > > ports: > > - "5432" > > environment: > > POSTGRES_PASSWORD: postgres > > > > The Kafka cluster is running properly, but I don't know how to verify if > > the Kafka Connect cluster is running properly. I prepared some test data > in > > the database, and created a source connector with the following > > configuration: > > > > { > > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", > > "name": "test", > > "tasks.max": 3, > > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > > "connection.url": "jdbc:postgresql://postgres: > > 5432/postgres?user=postgres& > > password=postgres", > > "table.whitelist": "pgbench_accounts", > > "batch.max.rows": 1, > > "topic.prefix": "test", > > "mode": "incrementing", > > "incrementing.column.name": "aid" > > } > > > > The ingestion process is correct and I can consume the produced messages. > > But I still have no way to figure out if the ingestion is parallelized. I > > called the status API and received the following result: > > > > { > > "name":"test", > > "connector":{ > > "state":"RUNNING", > > "worker_id":"connect2:8083" > > }, > > "tasks":[ > > { > > "state":"RUNNING", > > "id":0, > > "worker_id":"connect3:8083" > > } > > ] > > } > > > > This result is the same for all instances. Does it mean the ingestion > tasks > > are not parallelized? Is there anything important I am missing or this > type > > of clustering is simply not supported? > > > > Any comments and suggestions are highly appreciated. Have a nice day! > > > > Best regards, > > Yang > > >