[ https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen resolved KAFKA-17062. ------------------------------- Resolution: Fixed > RemoteLogManager - RemoteStorageException causes data loss > ---------------------------------------------------------- > > Key: KAFKA-17062 > URL: https://issues.apache.org/jira/browse/KAFKA-17062 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage > Affects Versions: 3.8.0, 3.7.1, 3.9.0 > Reporter: Guillaume Mallet > Assignee: Luke Chen > Priority: Critical > Labels: tiered-storage > Fix For: 3.9.0 > > > When Tiered Storage is configured, retention.bytes defines the limit for the > amount of data stored in the filesystem and in remote storage. However a > failure while offloading to remote storage can cause segments to be dropped > before the retention limit is met. > What happens > Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a > {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would > expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment > locally (the local segment) and possibly more if the remote storage is > offline. i.e. segments in the following RemoteLogSegmentStates in the > RemoteLogMetadataManager (RLMM) : > * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}) > * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > Let's assume the RLMM starts failing when segment 4 rolls. At the first > iteration of an RLMTask we will have - > * > [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773] > : is called first > ** RLMM becomes aware of Segment 4 and adds it to the metadata: > *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}), > *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}), > *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}}) > ** An exception is raised during the copy operation > ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93] > in RemoteStorageManager) which is caught with the error message “{{Error > occurred while copying log segments of partition}}” and no further copy will > be attempted for the duration of this RLMTask. > ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} > but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being > cleaned up when the associated segment is deleted. > * > [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122] > is then called > ** Retention size is computed in > [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296] > as the sum of all the segments size regardless of their state so computed > size of the topic is 1 (local) + 4 (remote) > ** Segment 1 as being the oldest will be dropped. > At the second iteration after > [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395] > (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 > in a {{COPY_SEGMENT_STARTED}} state each with a different > {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to > Segment 3 after another iteration. > At that point, we now have the RLMM composed of 4 copies of Segment 4 in > {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing > the LSO at the same time and causing the UnifiedLog to delete the local and > remote data for Segment 4 including its metadata. > Under those circumstances Kafka can quickly delete segments that were not > meant for deletion causing a data loss. > Steps to reproduce the problem: > 1. Enable tiered storage > {code:bash} > mkdir -p /tmp/tieredStorage/kafka-tiered-storage/ > cat <<EOF >> config/kraft/server.properties > remote.log.storage.system.enable=True > remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > remote.log.manager.task.interval.ms=5000 > remote.log.metadata.manager.listener.name=PLAINTEXT > rlmm.config.remote.log.metadata.topic.replication.factor=1 > rsm.config.dir=/tmp/tieredStorage > EOF > {code} > 2. Start a Kafka server with the following classpath. This is needed so we > can use test class LocalTieredStorage as an implementation of > RemoteStorageManager. > {code:bash} > export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}" > export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" > bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties > bin/kafka-server-start.sh config/kraft/server.properties > {code} > 3. In a separate shell, create the topic and produce enough records to fill > the remote log > {code:bash} > bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 > \ > --config retention.bytes=1000000000 --config segment.bytes=100000000 \ > --config remote.storage.enable=true --config local.retention.bytes=1 > bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \ > --throughput -1 --record-size 1000 \ > --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092 > {code} > 4. In a separate shell, watch the remote log directory content > {code:bash} > watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/ > {code} > 5. Once all logs are sent to the remote storage (when the server logs stops, > should take around 2min), stop the Kafka server > 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order > to throw a {{RemoteStorageException}} and disable the ability to store new > remote segments. > 7. Rebuild Kafka > {code:bash} > ./gradlew testJar > {code} > 8. Restart the Kafka server > {code:bash} > bin/kafka-server-start.sh config/kraft/server.properties > {code} > 9. Send enough data for one segment rollup > {code:bash} > bin/kafka-producer-perf-test.sh \ > --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \ > --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092 > {code} > All data in the remote directory will start getting deleted when we would > expect just no more writes to happen to the remote storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)