[ 
https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raju Gupta updated KAFKA-18852:
-------------------------------
    Description: 
### Analysis of Changes in `ApiVersions` Class  

The `ApiVersions` class has been updated to enhance **thread safety** and 
**performance** using concurrent data structures.  

#### Key Improvements  

1. **`ConcurrentHashMap` for `nodeApiVersions`**  
   - **Before**: Used `HashMap` with synchronized access.  
   - **After**: Replaced with `ConcurrentHashMap`, eliminating explicit 
synchronization.  
   - **Benefits**: Improved performance (concurrent reads/writes), reduced 
complexity.  

2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`**  
   - **Before**: `long` with synchronized updates.  
   - **After**: `AtomicLong` ensures atomic updates without locks.  
   - **Benefits**: Faster updates using CPU-level atomic operations.  

3. **Removed `synchronized` Blocks**  
   - **Impact**: Reduces lock contention, improving scalability.  
   - **Consideration**: Future modifications must ensure correct usage of 
concurrent structures.  

4. **Handling `finalizedFeatures`**  
   - **Issue**: Still uses `Map<String, Short>`, leading to potential race 
conditions.  
   - **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic 
updates.  

#### Updated Code  

```java
private final Map<String, NodeApiVersions> nodeApiVersions = new 
ConcurrentHashMap<>();
private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
private final AtomicReference<Map<String, Short>> finalizedFeatures = new 
AtomicReference<>(new ConcurrentHashMap<>());

public void update(String nodeId, NodeApiVersions nodeApiVersions) {
    this.nodeApiVersions.put(nodeId, nodeApiVersions);
    maxFinalizedFeaturesEpoch.updateAndGet(prev -> 
        Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
    this.finalizedFeatures.set(new 
ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
}

public NodeApiVersions get(String nodeId) {
    return this.nodeApiVersions.get();
}

public long getMaxFinalizedFeaturesEpoch() {
    return maxFinalizedFeaturesEpoch.get();
}

public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
    return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), 
finalizedFeatures.get());
}


> ApiVersions should use Concurrent Collections instead of sychronised
> --------------------------------------------------------------------
>
>                 Key: KAFKA-18852
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18852
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Raju Gupta
>            Priority: Minor
>         Attachments: KAFKA-18552.patch
>
>
> ### Analysis of Changes in `ApiVersions` Class  
> The `ApiVersions` class has been updated to enhance **thread safety** and 
> **performance** using concurrent data structures.  
> #### Key Improvements  
> 1. **`ConcurrentHashMap` for `nodeApiVersions`**  
>    - **Before**: Used `HashMap` with synchronized access.  
>    - **After**: Replaced with `ConcurrentHashMap`, eliminating explicit 
> synchronization.  
>    - **Benefits**: Improved performance (concurrent reads/writes), reduced 
> complexity.  
> 2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`**  
>    - **Before**: `long` with synchronized updates.  
>    - **After**: `AtomicLong` ensures atomic updates without locks.  
>    - **Benefits**: Faster updates using CPU-level atomic operations.  
> 3. **Removed `synchronized` Blocks**  
>    - **Impact**: Reduces lock contention, improving scalability.  
>    - **Consideration**: Future modifications must ensure correct usage of 
> concurrent structures.  
> 4. **Handling `finalizedFeatures`**  
>    - **Issue**: Still uses `Map<String, Short>`, leading to potential race 
> conditions.  
>    - **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic 
> updates.  
> #### Updated Code  
> ```java
> private final Map<String, NodeApiVersions> nodeApiVersions = new 
> ConcurrentHashMap<>();
> private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1);
> private final AtomicReference<Map<String, Short>> finalizedFeatures = new 
> AtomicReference<>(new ConcurrentHashMap<>());
> public void update(String nodeId, NodeApiVersions nodeApiVersions) {
>     this.nodeApiVersions.put(nodeId, nodeApiVersions);
>     maxFinalizedFeaturesEpoch.updateAndGet(prev -> 
>         Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch()));
>     this.finalizedFeatures.set(new 
> ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures()));
> }
> public NodeApiVersions get(String nodeId) {
>     return this.nodeApiVersions.get();
> }
> public long getMaxFinalizedFeaturesEpoch() {
>     return maxFinalizedFeaturesEpoch.get();
> }
> public FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
>     return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), 
> finalizedFeatures.get());
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to