[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-17062.
-------------------------------
    Resolution: Fixed

> RemoteLogManager - RemoteStorageException causes data loss
> ----------------------------------------------------------
>
>                 Key: KAFKA-17062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17062
>             Project: Kafka
>          Issue Type: Bug
>          Components: Tiered-Storage
>    Affects Versions: 3.8.0, 3.7.1, 3.9.0
>            Reporter: Guillaume Mallet
>            Assignee: Luke Chen
>            Priority: Critical
>              Labels: tiered-storage
>             Fix For: 3.9.0
>
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of all the segments size regardless of their state so computed 
> size of the topic is 1 (local) + 4 (remote)
>  ** Segment 1 as being the oldest will be dropped.
> At the second iteration after 
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
>  (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 
> in a {{COPY_SEGMENT_STARTED}} state each with a different 
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to 
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in 
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing 
> the LSO at the same time and causing the UnifiedLog to delete the local and 
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not 
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <<EOF >> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> remote.log.manager.task.interval.ms=5000
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rsm.config.dir=/tmp/tieredStorage
> EOF
> {code}
> 2. Start a Kafka server with the following classpath. This is needed so we 
> can use test class LocalTieredStorage as an implementation of 
> RemoteStorageManager.
> {code:bash}
> export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
> export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 3. In a separate shell, create the topic and produce enough records to fill 
> the remote log
> {code:bash}
> bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 
> \
>    --config retention.bytes=1000000000 --config segment.bytes=100000000 \
>    --config remote.storage.enable=true --config local.retention.bytes=1
> bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
>    --throughput -1 --record-size 1000 \
>    --producer-props acks=1 batch.size=100000  bootstrap.servers=localhost:9092
> {code}
> 4. In a separate shell, watch the remote log directory content
> {code:bash}
> watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
> {code}
> 5. Once all logs are sent to the remote storage (when the server logs stops, 
> should take around 2min), stop the Kafka server 
> 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order 
> to throw a {{RemoteStorageException}} and disable the ability to store new 
> remote segments.
> 7. Rebuild Kafka
> {code:bash}
>  ./gradlew testJar
> {code}
> 8. Restart the Kafka server
> {code:bash}
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 9. Send enough data for one segment rollup
> {code:bash}
> bin/kafka-producer-perf-test.sh \
>   --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
>   --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> All data in the remote directory will start getting deleted when we would 
> expect just no more writes to happen to the remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to