Hi Ewen, OK. Thanks a lot for your feedback!
Best regards, Yang 2017-01-03 22:42 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>: > It's an implementation detail of the JDBC connector. You could potentially > write a connector that parallelizes at that level, but you lose other > potentially useful properties (e.g. ordering). To split at this level you'd > have to do something like have each task be responsible for a subset of > rowids in the database. > > -Ewen > > On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote: > > > Hi Ewen, > > > > Thanks a lot for your reply. So it means we cannot parallelize ingestion > of > > one table with multiple processes. Is it because of Kafka Connect or the > > JDBC connector? > > > > Have a nice day. > > > > Best regards, > > Yang > > > > > > 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>: > > > > > The unit of parallelism in connect is a task. It's only listing one > task, > > > so you only have one process copying data. The connector can consume > data > > > from within a single *database* in parallel, but each *table* must be > > > handled by a single task. Since your table whitelist only includes a > > single > > > table, the connector will only generate a single task. If you add more > > > tables to the whitelist then you'll see more tasks in the status API > > > output. > > > > > > -Ewen > > > > > > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1...@gmail.com> > wrote: > > > > > > > Hi all, > > > > > > > > I am trying to run a Kafka Connect cluster to ingest data from a > > > relational > > > > database with jdbc connector. > > > > > > > > I have been investigating many other solutions including Spark, Flink > > and > > > > Flume before using Kafka Connect, but none of them can be used to > > ingest > > > > relational databases in a clusterable way. With "cluster" I mean > > > ingesting > > > > one database with several distributed processes in parallel, instead > of > > > > each process in the cluster ingesting different databases. Kafka > > Connect > > > is > > > > the option I am investigating currently. After reading the > > > documentation, I > > > > have not found any clear statement about if my use case can be > > supported, > > > > so I have to make a test to figure it out. > > > > > > > > I created a cluster with the following docker container > configuration: > > > > > > > > --- > > > > version: '2' > > > > services: > > > > zookeeper: > > > > image: confluentinc/cp-zookeeper > > > > hostname: zookeeper > > > > ports: > > > > - "2181" > > > > environment: > > > > ZOOKEEPER_CLIENT_PORT: 2181 > > > > ZOOKEEPER_TICK_TIME: 2000 > > > > > > > > broker1: > > > > image: confluentinc/cp-kafka > > > > hostname: broker1 > > > > depends_on: > > > > - zookeeper > > > > ports: > > > > - '9092' > > > > environment: > > > > KAFKA_BROKER_ID: 1 > > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092' > > > > > > > > broker2: > > > > image: confluentinc/cp-kafka > > > > hostname: broker2 > > > > depends_on: > > > > - zookeeper > > > > ports: > > > > - '9092' > > > > environment: > > > > KAFKA_BROKER_ID: 2 > > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092' > > > > > > > > broker3: > > > > image: confluentinc/cp-kafka > > > > hostname: broker3 > > > > depends_on: > > > > - zookeeper > > > > ports: > > > > - '9092' > > > > environment: > > > > KAFKA_BROKER_ID: 3 > > > > KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092' > > > > > > > > schema_registry: > > > > image: confluentinc/cp-schema-registry > > > > hostname: schema_registry > > > > depends_on: > > > > - zookeeper > > > > - broker1 > > > > - broker2 > > > > - broker3 > > > > ports: > > > > - '8081' > > > > environment: > > > > SCHEMA_REGISTRY_HOST_NAME: schema_registry > > > > SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' > > > > > > > > connect1: > > > > image: confluentinc/cp-kafka-connect > > > > hostname: connect1 > > > > depends_on: > > > > - zookeeper > > > > - broker1 > > > > - broker2 > > > > - broker3 > > > > - schema_registry > > > > ports: > > > > - "8083" > > > > environment: > > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092, > > broker3:9092' > > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect1 > > > > CONNECT_REST_PORT: 8083 > > > > CONNECT_GROUP_ID: compose-connect-group > > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081 > > > > ' > > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro. > AvroConverter > > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081' > > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_INTERNAL_VALUE_CONVERTER: > org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > > > > > connect2: > > > > image: confluentinc/cp-kafka-connect > > > > hostname: connect2 > > > > depends_on: > > > > - zookeeper > > > > - broker1 > > > > - broker2 > > > > - broker3 > > > > - schema_registry > > > > ports: > > > > - "8083" > > > > environment: > > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092, > > broker3:9092' > > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect2 > > > > CONNECT_REST_PORT: 8083 > > > > CONNECT_GROUP_ID: compose-connect-group > > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081 > > > > ' > > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro. > AvroConverter > > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081' > > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_INTERNAL_VALUE_CONVERTER: > org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > > > > > connect3: > > > > image: confluentinc/cp-kafka-connect > > > > hostname: connect3 > > > > depends_on: > > > > - zookeeper > > > > - broker1 > > > > - broker2 > > > > - broker3 > > > > - schema_registry > > > > ports: > > > > - "8083" > > > > environment: > > > > CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092, > > broker3:9092' > > > > CONNECT_REST_ADVERTISED_HOST_NAME: connect3 > > > > CONNECT_REST_PORT: 8083 > > > > CONNECT_GROUP_ID: compose-connect-group > > > > CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs > > > > CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets > > > > CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status > > > > CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter > > > > CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081 > > > > ' > > > > CONNECT_VALUE_CONVERTER: io.confluent.connect.avro. > AvroConverter > > > > CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ' > > > > http://schema_registry:8081' > > > > CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_INTERNAL_VALUE_CONVERTER: > org.apache.kafka.connect.json. > > > > JsonConverter > > > > CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > > > > > control-center: > > > > image: confluentinc/cp-enterprise-control-center > > > > depends_on: > > > > - zookeeper > > > > - broker1 > > > > - broker2 > > > > - broker3 > > > > - schema_registry > > > > - connect1 > > > > - connect2 > > > > - connect3 > > > > ports: > > > > - "9021:9021" > > > > environment: > > > > CONTROL_CENTER_BOOTSTRAP_SERVERS: > 'broker1:9092,broker2:9092,bro > > > > ker3:9092' > > > > CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' > > > > CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c > > > > onnect3:8083' > > > > CONTROL_CENTER_REPLICATION_FACTOR: 1 > > > > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 > > > > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 > > > > PORT: 9021 > > > > > > > > postgres: > > > > image: postgres > > > > hostname: postgres > > > > ports: > > > > - "5432" > > > > environment: > > > > POSTGRES_PASSWORD: postgres > > > > > > > > The Kafka cluster is running properly, but I don't know how to verify > > if > > > > the Kafka Connect cluster is running properly. I prepared some test > > data > > > in > > > > the database, and created a source connector with the following > > > > configuration: > > > > > > > > { > > > > "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", > > > > "name": "test", > > > > "tasks.max": 3, > > > > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > > > > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > > > > "connection.url": "jdbc:postgresql://postgres: > > > > 5432/postgres?user=postgres& > > > > password=postgres", > > > > "table.whitelist": "pgbench_accounts", > > > > "batch.max.rows": 1, > > > > "topic.prefix": "test", > > > > "mode": "incrementing", > > > > "incrementing.column.name": "aid" > > > > } > > > > > > > > The ingestion process is correct and I can consume the produced > > messages. > > > > But I still have no way to figure out if the ingestion is > > parallelized. I > > > > called the status API and received the following result: > > > > > > > > { > > > > "name":"test", > > > > "connector":{ > > > > "state":"RUNNING", > > > > "worker_id":"connect2:8083" > > > > }, > > > > "tasks":[ > > > > { > > > > "state":"RUNNING", > > > > "id":0, > > > > "worker_id":"connect3:8083" > > > > } > > > > ] > > > > } > > > > > > > > This result is the same for all instances. Does it mean the ingestion > > > tasks > > > > are not parallelized? Is there anything important I am missing or > this > > > type > > > > of clustering is simply not supported? > > > > > > > > Any comments and suggestions are highly appreciated. Have a nice day! > > > > > > > > Best regards, > > > > Yang > > > > > > > > > >