Hi Ewen,

Thanks a lot for your reply. So it means we cannot parallelize ingestion of
one table with multiple processes. Is it because of Kafka Connect or the
JDBC connector?

Have a nice day.

Best regards,
Yang


2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>:

> The unit of parallelism in connect is a task. It's only listing one task,
> so you only have one process copying data. The connector can consume data
> from within a single *database* in parallel, but each *table* must be
> handled by a single task. Since your table whitelist only includes a single
> table, the connector will only generate a single task. If you add more
> tables to the whitelist then you'll see more tasks in the status API
> output.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
> > Hi all,
> >
> > I am trying to run a Kafka Connect cluster to ingest data from a
> relational
> > database with jdbc connector.
> >
> > I have been investigating many other solutions including Spark, Flink and
> > Flume before using Kafka Connect, but none of them can be used to ingest
> > relational databases in a clusterable way. With "cluster" I mean
> ingesting
> > one database with several distributed processes in parallel, instead of
> > each process in the cluster ingesting different databases. Kafka Connect
> is
> > the option I am investigating currently. After reading the
> documentation, I
> > have not found any clear statement about if my use case can be supported,
> > so I have to make a test to figure it out.
> >
> > I created a cluster with the following docker container configuration:
> >
> > ---
> > version: '2'
> > services:
> >  zookeeper:
> >    image: confluentinc/cp-zookeeper
> >    hostname: zookeeper
> >    ports:
> >      - "2181"
> >    environment:
> >      ZOOKEEPER_CLIENT_PORT: 2181
> >      ZOOKEEPER_TICK_TIME: 2000
> >
> >   broker1:
> >    image: confluentinc/cp-kafka
> >    hostname: broker1
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 1
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> >
> >   broker2:
> >    image: confluentinc/cp-kafka
> >    hostname: broker2
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 2
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> >
> >   broker3:
> >    image: confluentinc/cp-kafka
> >    hostname: broker3
> >    depends_on:
> >      - zookeeper
> >    ports:
> >      - '9092'
> >    environment:
> >      KAFKA_BROKER_ID: 3
> >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> >
> >   schema_registry:
> >    image: confluentinc/cp-schema-registry
> >    hostname: schema_registry
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >    ports:
> >      - '8081'
> >    environment:
> >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> >
> >   connect1:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect1
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   connect2:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect2
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   connect3:
> >    image: confluentinc/cp-kafka-connect
> >    hostname: connect3
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >    ports:
> >      - "8083"
> >    environment:
> >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,broker3:9092'
> >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> >      CONNECT_REST_PORT: 8083
> >      CONNECT_GROUP_ID: compose-connect-group
> >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081
> > '
> >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
> >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > http://schema_registry:8081'
> >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.
> > JsonConverter
> >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >
> >   control-center:
> >    image: confluentinc/cp-enterprise-control-center
> >    depends_on:
> >      - zookeeper
> >      - broker1
> >      - broker2
> >      - broker3
> >      - schema_registry
> >      - connect1
> >      - connect2
> >      - connect3
> >    ports:
> >      - "9021:9021"
> >    environment:
> >      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,bro
> > ker3:9092'
> >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > onnect3:8083'
> >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> >      PORT: 9021
> >
> >   postgres:
> >    image: postgres
> >    hostname: postgres
> >    ports:
> >      - "5432"
> >    environment:
> >      POSTGRES_PASSWORD: postgres
> >
> > The Kafka cluster is running properly, but I don't know how to verify if
> > the Kafka Connect cluster is running properly. I prepared some test data
> in
> > the database, and created a source connector with the following
> > configuration:
> >
> > {
> >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> >  "name": "test",
> >  "tasks.max": 3,
> >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> >  "connection.url": "jdbc:postgresql://postgres:
> > 5432/postgres?user=postgres&
> > password=postgres",
> >  "table.whitelist": "pgbench_accounts",
> >  "batch.max.rows": 1,
> >  "topic.prefix": "test",
> >  "mode": "incrementing",
> >  "incrementing.column.name": "aid"
> > }
> >
> > The ingestion process is correct and I can consume the produced messages.
> > But I still have no way to figure out if the ingestion is parallelized. I
> > called the status API and received the following result:
> >
> > {
> >    "name":"test",
> >    "connector":{
> >       "state":"RUNNING",
> >       "worker_id":"connect2:8083"
> >    },
> >    "tasks":[
> >       {
> >          "state":"RUNNING",
> >          "id":0,
> >          "worker_id":"connect3:8083"
> >       }
> >    ]
> > }
> >
> > This result is the same for all instances. Does it mean the ingestion
> tasks
> > are not parallelized? Is there anything important I am missing or this
> type
> > of clustering is simply not supported?
> >
> > Any comments and suggestions are highly appreciated. Have a nice day!
> >
> > Best regards,
> > Yang
> >
>

Reply via email to