Hi Everyone, 

First of all, I want to send thanks to Kafka team for the beautiful tool.
I just started with Kafka recently, and I am trying to deploy a Kafka cluster 
on AWS EC2 using Confluent's docker images, which consists of: (i) 3 Zookeeper 
hosts, (ii) 1 Kafka broker auto-scaling-group (ASG) of 3 servers, and (iii) 1 
client ASG on which I run kafka-rest-proxy and kafka-connect.
The default topic-replicas setting is 3.
The connector configuration is quoted at the end of this post.
The whole cluster has been running fine.

Today I needed to upgrade the brokers and the client ASGs. We used AWS' ASG 
rolling update method, which turns off one old server, adds a new one with the 
new configuration, waits 20 minutes, then the 2nd one,...

The problem came after the last old server was terminated and the last new 
server was added, my S3 connector started reading from the beginning of the 
topics and write the output to S3. So I ended up having two copies of my Kafka 
records in S3.

Is this the expected behaviour? As per this page 
"https://docs.confluent.io/current/connect/kafka-connect-s3/index.html"; it 
seems not.
If that behaviour is expected, then what's the error in my upgrade procedure? 
(I didn't have a chance to check the replicas of my topics during the upgrade)

Thanks and regards,
Averell

{
        "name": "s3-bytearray",
        "config": {
                "connector.class": "io.confluent.connect.s3.S3SinkConnector",
                "tasks.max": "3",
                "errors.log.enable": "true",
                "errors.log.include.messages": "true",
                "topics": "<my_topics>",
                "topics.dir": "<my_s3_prefix>",
                "rotate.interval.ms": "600000",
                "schema.compatibility": "NONE",
                "flush.size": "5000",
                "s3.bucket.name": "<my bucket>",
                "s3.part.size": "5242880",
                "s3.region": "ap-southeast-2",
                "s3.compression.type": "gzip",
                "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
                "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
                "key.converter.schemas.enable": "false",
                "value.converter.schemas.enable": "false",
                "storage.class": "io.confluent.connect.s3.storage.S3Storage",
                "format.class": 
"io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
                "format.bytearray.extension": ".json",
                "partitioner.class": 
"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
                "partition.duration.ms": "86400000",
                "errors.tolerance": "all",
                "errors.deadletterqueue.topic.name": 
"connect_S3_bytearray_error",
                "errors.deadletterqueue.topic.replication.factor": 2,
                "locale": "AU",
                "timezone": "UTC",
                "path.format": "'dt'=YYYY'-'MM'-'dd",
                "timestamp.extractor": "Record"
        }
}


Reply via email to