[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ]
yazgoo commented on KAFKA-10413: -------------------------------- Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xedkill() { docker stop "$1" || true docker rm -v -f "$1" || true }write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket }launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1" \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" }cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done }launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done }cleanup_docker_envlaunch_kafka launch_minio launch_kafka_connect 1while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break donesleep 10for i in {1..2} do# Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000", "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectorsdonelaunch_kafka_connect 2 launch_kafka_connect 3 launch_kafka_connect 4 launch_kafka_connect 5 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 30 "worker_id": "k2:8082" 30 "worker_id": "k3:8083" 30 "worker_id": "k4:8084" 30 "worker_id": "k5:8085" {code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 48 "worker_id": "k1:8081" 18 "worker_id": "k2:8082" 18 "worker_id": "k3:8083" 18 "worker_id": "k4:8084" 18 "worker_id": "k5:8085" {code} > rebalancing leads to unevenly balanced connectors > ------------------------------------------------- > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: connect > Affects Versions: 2.5.1 > Reporter: yazgoo > Assignee: rameshkrishnan muthusamy > Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png, rebalance.sh > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)