Hi Ewen,

OK. Thanks a lot for your feedback!

Best regards,
Yang

2017-01-03 22:42 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>:

> It's an implementation detail of the JDBC connector. You could potentially
> write a connector that parallelizes at that level, but you lose other
> potentially useful properties (e.g. ordering). To split at this level you'd
> have to do something like have each task be responsible for a subset of
> rowids in the database.
>
> -Ewen
>
> On Tue, Jan 3, 2017 at 1:24 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>
> > Hi Ewen,
> >
> > Thanks a lot for your reply. So it means we cannot parallelize ingestion
> of
> > one table with multiple processes. Is it because of Kafka Connect or the
> > JDBC connector?
> >
> > Have a nice day.
> >
> > Best regards,
> > Yang
> >
> >
> > 2017-01-03 20:55 GMT+01:00 Ewen Cheslack-Postava <e...@confluent.io>:
> >
> > > The unit of parallelism in connect is a task. It's only listing one
> task,
> > > so you only have one process copying data. The connector can consume
> data
> > > from within a single *database* in parallel, but each *table* must be
> > > handled by a single task. Since your table whitelist only includes a
> > single
> > > table, the connector will only generate a single task. If you add more
> > > tables to the whitelist then you'll see more tasks in the status API
> > > output.
> > >
> > > -Ewen
> > >
> > > On Tue, Jan 3, 2017 at 4:03 AM, Yuanzhe Yang <yyz1...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am trying to run a Kafka Connect cluster to ingest data from a
> > > relational
> > > > database with jdbc connector.
> > > >
> > > > I have been investigating many other solutions including Spark, Flink
> > and
> > > > Flume before using Kafka Connect, but none of them can be used to
> > ingest
> > > > relational databases in a clusterable way. With "cluster" I mean
> > > ingesting
> > > > one database with several distributed processes in parallel, instead
> of
> > > > each process in the cluster ingesting different databases. Kafka
> > Connect
> > > is
> > > > the option I am investigating currently. After reading the
> > > documentation, I
> > > > have not found any clear statement about if my use case can be
> > supported,
> > > > so I have to make a test to figure it out.
> > > >
> > > > I created a cluster with the following docker container
> configuration:
> > > >
> > > > ---
> > > > version: '2'
> > > > services:
> > > >  zookeeper:
> > > >    image: confluentinc/cp-zookeeper
> > > >    hostname: zookeeper
> > > >    ports:
> > > >      - "2181"
> > > >    environment:
> > > >      ZOOKEEPER_CLIENT_PORT: 2181
> > > >      ZOOKEEPER_TICK_TIME: 2000
> > > >
> > > >   broker1:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker1
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 1
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:9092'
> > > >
> > > >   broker2:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker2
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 2
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker2:9092'
> > > >
> > > >   broker3:
> > > >    image: confluentinc/cp-kafka
> > > >    hostname: broker3
> > > >    depends_on:
> > > >      - zookeeper
> > > >    ports:
> > > >      - '9092'
> > > >    environment:
> > > >      KAFKA_BROKER_ID: 3
> > > >      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker3:9092'
> > > >
> > > >   schema_registry:
> > > >    image: confluentinc/cp-schema-registry
> > > >    hostname: schema_registry
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >    ports:
> > > >      - '8081'
> > > >    environment:
> > > >      SCHEMA_REGISTRY_HOST_NAME: schema_registry
> > > >      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
> > > >
> > > >   connect1:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect1
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   connect2:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect2
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect2
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   connect3:
> > > >    image: confluentinc/cp-kafka-connect
> > > >    hostname: connect3
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >    ports:
> > > >      - "8083"
> > > >    environment:
> > > >      CONNECT_BOOTSTRAP_SERVERS: 'broker1:9092,broker2:9092,
> > broker3:9092'
> > > >      CONNECT_REST_ADVERTISED_HOST_NAME: connect3
> > > >      CONNECT_REST_PORT: 8083
> > > >      CONNECT_GROUP_ID: compose-connect-group
> > > >      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
> > > >      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
> > > >      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
> > > >      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
> > > >      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081
> > > > '
> > > >      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.
> AvroConverter
> > > >      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '
> > > > http://schema_registry:8081'
> > > >      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_INTERNAL_VALUE_CONVERTER:
> org.apache.kafka.connect.json.
> > > > JsonConverter
> > > >      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >
> > > >   control-center:
> > > >    image: confluentinc/cp-enterprise-control-center
> > > >    depends_on:
> > > >      - zookeeper
> > > >      - broker1
> > > >      - broker2
> > > >      - broker3
> > > >      - schema_registry
> > > >      - connect1
> > > >      - connect2
> > > >      - connect3
> > > >    ports:
> > > >      - "9021:9021"
> > > >    environment:
> > > >      CONTROL_CENTER_BOOTSTRAP_SERVERS:
> 'broker1:9092,broker2:9092,bro
> > > > ker3:9092'
> > > >      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
> > > >      CONTROL_CENTER_CONNECT_CLUSTER: 'connect1:8083,connect2:8083,c
> > > > onnect3:8083'
> > > >      CONTROL_CENTER_REPLICATION_FACTOR: 1
> > > >      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
> > > >      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
> > > >      PORT: 9021
> > > >
> > > >   postgres:
> > > >    image: postgres
> > > >    hostname: postgres
> > > >    ports:
> > > >      - "5432"
> > > >    environment:
> > > >      POSTGRES_PASSWORD: postgres
> > > >
> > > > The Kafka cluster is running properly, but I don't know how to verify
> > if
> > > > the Kafka Connect cluster is running properly. I prepared some test
> > data
> > > in
> > > > the database, and created a source connector with the following
> > > > configuration:
> > > >
> > > > {
> > > >  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
> > > >  "name": "test",
> > > >  "tasks.max": 3,
> > > >  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > >  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> > > >  "connection.url": "jdbc:postgresql://postgres:
> > > > 5432/postgres?user=postgres&
> > > > password=postgres",
> > > >  "table.whitelist": "pgbench_accounts",
> > > >  "batch.max.rows": 1,
> > > >  "topic.prefix": "test",
> > > >  "mode": "incrementing",
> > > >  "incrementing.column.name": "aid"
> > > > }
> > > >
> > > > The ingestion process is correct and I can consume the produced
> > messages.
> > > > But I still have no way to figure out if the ingestion is
> > parallelized. I
> > > > called the status API and received the following result:
> > > >
> > > > {
> > > >    "name":"test",
> > > >    "connector":{
> > > >       "state":"RUNNING",
> > > >       "worker_id":"connect2:8083"
> > > >    },
> > > >    "tasks":[
> > > >       {
> > > >          "state":"RUNNING",
> > > >          "id":0,
> > > >          "worker_id":"connect3:8083"
> > > >       }
> > > >    ]
> > > > }
> > > >
> > > > This result is the same for all instances. Does it mean the ingestion
> > > tasks
> > > > are not parallelized? Is there anything important I am missing or
> this
> > > type
> > > > of clustering is simply not supported?
> > > >
> > > > Any comments and suggestions are highly appreciated. Have a nice day!
> > > >
> > > > Best regards,
> > > > Yang
> > > >
> > >
> >
>

Reply via email to