[ https://issues.apache.org/jira/browse/KAFKA-18852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raju Gupta updated KAFKA-18852: ------------------------------- Description: ### Analysis of Changes in `ApiVersions` Class The `ApiVersions` class has been updated to enhance **thread safety** and **performance** using concurrent data structures. #### Key Improvements 1. **`ConcurrentHashMap` for `nodeApiVersions`** - **Before**: Used `HashMap` with synchronized access. - **After**: Replaced with `ConcurrentHashMap`, eliminating explicit synchronization. - **Benefits**: Improved performance (concurrent reads/writes), reduced complexity. 2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`** - **Before**: `long` with synchronized updates. - **After**: `AtomicLong` ensures atomic updates without locks. - **Benefits**: Faster updates using CPU-level atomic operations. 3. **Removed `synchronized` Blocks** - **Impact**: Reduces lock contention, improving scalability. - **Consideration**: Future modifications must ensure correct usage of concurrent structures. 4. **Handling `finalizedFeatures`** - **Issue**: Still uses `Map<String, Short>`, leading to potential race conditions. - **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic updates. #### Updated Code ```java private final Map<String, NodeApiVersions> nodeApiVersions = new ConcurrentHashMap<>(); private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1); private final AtomicReference<Map<String, Short>> finalizedFeatures = new AtomicReference<>(new ConcurrentHashMap<>()); public void update(String nodeId, NodeApiVersions nodeApiVersions) { this.nodeApiVersions.put(nodeId, nodeApiVersions); maxFinalizedFeaturesEpoch.updateAndGet(prev -> Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch())); this.finalizedFeatures.set(new ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures())); } public NodeApiVersions get(String nodeId) { return this.nodeApiVersions.get(); } public long getMaxFinalizedFeaturesEpoch() { return maxFinalizedFeaturesEpoch.get(); } public FinalizedFeaturesInfo getFinalizedFeaturesInfo() { return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), finalizedFeatures.get()); } > ApiVersions should use Concurrent Collections instead of sychronised > -------------------------------------------------------------------- > > Key: KAFKA-18852 > URL: https://issues.apache.org/jira/browse/KAFKA-18852 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Raju Gupta > Priority: Minor > Attachments: KAFKA-18552.patch > > > ### Analysis of Changes in `ApiVersions` Class > The `ApiVersions` class has been updated to enhance **thread safety** and > **performance** using concurrent data structures. > #### Key Improvements > 1. **`ConcurrentHashMap` for `nodeApiVersions`** > - **Before**: Used `HashMap` with synchronized access. > - **After**: Replaced with `ConcurrentHashMap`, eliminating explicit > synchronization. > - **Benefits**: Improved performance (concurrent reads/writes), reduced > complexity. > 2. **`AtomicLong` for `maxFinalizedFeaturesEpoch`** > - **Before**: `long` with synchronized updates. > - **After**: `AtomicLong` ensures atomic updates without locks. > - **Benefits**: Faster updates using CPU-level atomic operations. > 3. **Removed `synchronized` Blocks** > - **Impact**: Reduces lock contention, improving scalability. > - **Consideration**: Future modifications must ensure correct usage of > concurrent structures. > 4. **Handling `finalizedFeatures`** > - **Issue**: Still uses `Map<String, Short>`, leading to potential race > conditions. > - **Fix**: Replaced with `AtomicReference<Map<String, Short>>` for atomic > updates. > #### Updated Code > ```java > private final Map<String, NodeApiVersions> nodeApiVersions = new > ConcurrentHashMap<>(); > private final AtomicLong maxFinalizedFeaturesEpoch = new AtomicLong(-1); > private final AtomicReference<Map<String, Short>> finalizedFeatures = new > AtomicReference<>(new ConcurrentHashMap<>()); > public void update(String nodeId, NodeApiVersions nodeApiVersions) { > this.nodeApiVersions.put(nodeId, nodeApiVersions); > maxFinalizedFeaturesEpoch.updateAndGet(prev -> > Math.max(prev, nodeApiVersions.finalizedFeaturesEpoch())); > this.finalizedFeatures.set(new > ConcurrentHashMap<>(nodeApiVersions.finalizedFeatures())); > } > public NodeApiVersions get(String nodeId) { > return this.nodeApiVersions.get(); > } > public long getMaxFinalizedFeaturesEpoch() { > return maxFinalizedFeaturesEpoch.get(); > } > public FinalizedFeaturesInfo getFinalizedFeaturesInfo() { > return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch.get(), > finalizedFeatures.get()); > } -- This message was sent by Atlassian Jira (v8.20.10#820010)